找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4972|回复: 1

Windows平台下,完成端口的程序框架

[复制链接]
发表于 2008-9-21 15:19:47 | 显示全部楼层 |阅读模式
────────────────────────────────
  1. //
  2. // Function: PostRecv
  3. //
  4. // Description:
  5. //    Post an overlapped receive operation on the socket.
  6. //
  7. int PostRecv(SOCKET_OBJ *sock, BUFFER_OBJ *recvobj)
  8. {
  9.     WSABUF  wbuf;
  10.     DWORD   bytes,
  11.             flags;
  12.     int     rc;
  13.     recvobj->operation = OP_READ;
  14.     wbuf.buf = recvobj->buf;
  15.     wbuf.len = recvobj->buflen;
  16.     flags = 0;
  17.     EnterCriticalSection(&sock->SockCritSec);
  18.     rc = WSARecv(
  19.             sock->s,
  20.            &wbuf,
  21.             1,
  22.            &bytes,
  23.            &flags,
  24.            &recvobj->ol,
  25.             NULL
  26.             );
  27.     if (rc == SOCKET_ERROR)
  28.     {
  29.         rc = NO_ERROR;
  30.         if (WSAGetLastError() != WSA_IO_PENDING)
  31.         {
  32.             dbgprint("PostRecv: WSARecv* failed: %d\n", WSAGetLastError());
  33.             rc = SOCKET_ERROR;
  34.         }
  35.     }
  36.     if (rc == NO_ERROR)
  37.     {
  38.         // Increment outstanding overlapped operations
  39.         InterlockedIncrement(&sock->OutstandingRecv);
  40.     }
  41.     LeaveCriticalSection(&sock->SockCritSec);
  42.     return rc;
  43. }
  44. //
  45. // Function: PostSend
  46. //
  47. // Description:
  48. //    Post an overlapped send operation on the socket.
  49. //
  50. int PostSend(SOCKET_OBJ *sock, BUFFER_OBJ *sendobj)
  51. {
  52.     WSABUF  wbuf;
  53.     DWORD   bytes;
  54.     int     rc, err;
  55.     sendobj->operation = OP_WRITE;
  56.     wbuf.buf = sendobj->buf;
  57.     wbuf.len = sendobj->buflen;
  58.     EnterCriticalSection(&sock->SockCritSec);
  59.     rc = WSASend(
  60.             sock->s,
  61.            &wbuf,
  62.             1,
  63.            &bytes,
  64.             0,
  65.            &sendobj->ol,
  66.             NULL
  67.             );
  68.     if (rc == SOCKET_ERROR)
  69.     {
  70.         rc = NO_ERROR;
  71.         if ((err = WSAGetLastError()) != WSA_IO_PENDING)
  72.         {
  73.             if (err == WSAENOBUFS)
  74.                 DebugBreak();
  75.             dbgprint("PostSend: WSASend* failed: %d [internal = %d]\n", WSAGetLastError(), sendobj->ol.Internal);
  76.             rc = SOCKET_ERROR;
  77.         }
  78.     }
  79.     if (rc == NO_ERROR)
  80.     {
  81.         // Increment the outstanding operation count
  82.         InterlockedIncrement(&sock->OutstandingSend);
  83.         InterlockedIncrement(&gOutstandingSends);
  84.     }
  85.     LeaveCriticalSection(&sock->SockCritSec);
  86.     return rc;
  87. }
  88. //
  89. // Function: PostAccept
  90. //
  91. // Description:
  92. //    Post an overlapped accept on a listening socket.
  93. //
  94. int PostAccept(LISTEN_OBJ *listen, BUFFER_OBJ *acceptobj)
  95. {
  96.     DWORD   bytes;
  97.     int     rc;
  98.     acceptobj->operation = OP_ACCEPT;
  99.     // Create the client socket for an incoming connection
  100.     acceptobj->sclient = socket(listen->AddressFamily, SOCK_STREAM, IPPROTO_TCP);
  101.     if (acceptobj->sclient == INVALID_SOCKET)
  102.     {
  103.         fprintf(stderr, "PostAccept: socket failed: %d\n", WSAGetLastError());
  104.         return -1;
  105.     }
  106.     rc = listen->lpfnAcceptEx(
  107.             listen->s,
  108.             acceptobj->sclient,
  109.             acceptobj->buf,
  110.             acceptobj->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
  111.             sizeof(SOCKADDR_STORAGE) + 16,
  112.             sizeof(SOCKADDR_STORAGE) + 16,
  113.            &bytes,
  114.            &acceptobj->ol
  115.             );
  116.     if (rc == FALSE)
  117.     {
  118.         if (WSAGetLastError() != WSA_IO_PENDING)
  119.         {
  120.             printf("PostAccept: AcceptEx failed: %d\n",
  121.                     WSAGetLastError());
  122.             return SOCKET_ERROR;
  123.         }
  124.     }
  125.     // Increment the outstanding overlapped count for this socket
  126.     InterlockedIncrement(&listen->PendingAcceptCount);
  127.     return NO_ERROR;
  128. }
  129. //
  130. // Function: HandleIo
  131. //
  132. // Description:
  133. //    This function handles the IO on a socket. In the event of a receive, the
  134. //    completed receive is posted again. For completed accepts, another AcceptEx
  135. //    is posted. For completed sends, the buffer is freed.
  136. //
  137. void HandleIo(ULONG_PTR key, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error)
  138. {
  139.     LISTEN_OBJ *listenobj=NULL;
  140.     SOCKET_OBJ *sockobj=NULL,
  141.                *clientobj=NULL;     // New client object for accepted connections
  142.     BUFFER_OBJ *recvobj=NULL,       // Used to post new receives on accepted connections
  143.                *sendobj=NULL;       // Used to post new sends for data received
  144.     BOOL        bCleanupSocket;
  145.     if (error != 0)
  146.     {
  147.         dbgprint("OP = %d; Error = %d\n", buf->operation, error);
  148.     }
  149.     bCleanupSocket = FALSE;
  150.     if (error != NO_ERROR)
  151.     {
  152.         // An error occured on a TCP socket, free the associated per I/O buffer
  153.         // and see if there are any more outstanding operations. If so we must
  154.         // wait until they are complete as well.
  155.         //
  156.         if (buf->operation != OP_ACCEPT)
  157.         {
  158.             sockobj = (SOCKET_OBJ *)key;
  159.             if (buf->operation == OP_READ)
  160.             {
  161.                 if ((InterlockedDecrement(&sockobj->OutstandingRecv) == 0) &&
  162.                     (sockobj->OutstandingSend == 0) )
  163.                 {
  164.                     dbgprint("Freeing socket obj in GetOverlappedResult\n");
  165.                     FreeSocketObj(sockobj);
  166.                 }
  167.             }
  168.             else if (buf->operation == OP_WRITE)
  169.             {
  170.                 if ((InterlockedDecrement(&sockobj->OutstandingSend) == 0) &&
  171.                     (sockobj->OutstandingRecv == 0) )
  172.                 {
  173.                     dbgprint("Freeing socket obj in GetOverlappedResult\n");
  174.                     FreeSocketObj(sockobj);
  175.                 }
  176.             }
  177.         }
  178.         else
  179.         {
  180.             listenobj = (LISTEN_OBJ *)key;
  181.             printf("Accept failed\n");
  182.             closesocket(buf->sclient);
  183.             buf->sclient = INVALID_SOCKET;
  184.         }
  185.         FreeBufferObj(buf);
  186.         return;
  187.     }
  188.     if (buf->operation == OP_ACCEPT)
  189.     {
  190.         HANDLE            hrc;
  191.         SOCKADDR_STORAGE *LocalSockaddr=NULL,
  192.                          *RemoteSockaddr=NULL;
  193.         int               LocalSockaddrLen,
  194.                           RemoteSockaddrLen;
  195.         listenobj = (LISTEN_OBJ *)key;
  196.         // Update counters
  197.         InterlockedIncrement(&gConnections);
  198.         InterlockedIncrement(&gConnectionsLast);
  199.         InterlockedDecrement(&listenobj->PendingAcceptCount);
  200.         InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
  201.         InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
  202.         // Print the client's addresss
  203.         listenobj->lpfnGetAcceptExSockaddrs(
  204.                 buf->buf,
  205.                 buf->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
  206.                 sizeof(SOCKADDR_STORAGE) + 16,
  207.                 sizeof(SOCKADDR_STORAGE) + 16,
  208.                 (SOCKADDR **)&LocalSockaddr,
  209.                &LocalSockaddrLen,
  210.                 (SOCKADDR **)&RemoteSockaddr,
  211.                &RemoteSockaddrLen
  212.                 );
  213.         RemovePendingAccept(listenobj, buf);
  214.         // Get a new SOCKET_OBJ for the client connection
  215.         clientobj = GetSocketObj(buf->sclient, listenobj->AddressFamily);
  216.         if (clientobj)
  217.         {
  218.             // Associate the new connection to our completion port
  219.             hrc = CreateIoCompletionPort(
  220.                     (HANDLE)clientobj->s,
  221.                     CompPort,
  222.                     (ULONG_PTR)clientobj,
  223.                     0
  224.                     );
  225.             if (hrc == NULL)
  226.             {
  227.                 fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n",
  228.                         GetLastError());
  229.                 return;
  230.             }
  231.             sendobj = buf;
  232.             sendobj->buflen = BytesTransfered;
  233.             // Post the send - this is the first one for this connection so just do it
  234.             sendobj->sock = clientobj;
  235.             //PostSend(clientobj, sendobj);
  236.             EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
  237.         }
  238.         else
  239.         {
  240.             // Can't allocate a socket structure so close the connection
  241.             closesocket(buf->sclient);
  242.             buf->sclient = INVALID_SOCKET;
  243.             FreeBufferObj(buf);
  244.         }
  245.         
  246.   if (error != NO_ERROR)
  247.   {
  248.             // Check for socket closure
  249.             EnterCriticalSection(&clientobj->SockCritSec);
  250.             if ( (clientobj->OutstandingSend == 0) &&
  251.                  (clientobj->OutstandingRecv == 0) )
  252.             {
  253.                 closesocket(clientobj->s);
  254.                 clientobj->s = INVALID_SOCKET;
  255.                 FreeSocketObj(clientobj);
  256.             }
  257.             else
  258.             {
  259.                 clientobj->bClosing = TRUE;
  260.             }
  261.             LeaveCriticalSection(&clientobj->SockCritSec);
  262.             error = NO_ERROR;
  263.   }
  264.         InterlockedIncrement(&listenobj->RepostCount);
  265.         SetEvent(listenobj->RepostAccept);
  266.     }
  267.     else if (buf->operation == OP_READ)
  268.     {
  269.         sockobj = (SOCKET_OBJ *)key;
  270.         InterlockedDecrement(&sockobj->OutstandingRecv);
  271.         //
  272.         // Receive completed successfully
  273.         //
  274.         if (BytesTransfered > 0)
  275.         {
  276.             InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
  277.             InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
  278.             // Make the recv a send
  279.             sendobj         = buf;
  280.             sendobj->buflen = BytesTransfered;
  281.             sendobj->sock = sockobj;
  282.             //PostSend(sockobj, sendobj);
  283.             EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
  284.         }
  285.         else
  286.         {
  287.             //dbgprint("Got 0 byte receive\n");
  288.             // Graceful close - the receive returned 0 bytes read
  289.             sockobj->bClosing = TRUE;
  290.             // Free the receive buffer
  291.             FreeBufferObj(buf);
  292.             // If this was the last outstanding operation on socket, clean it up
  293.             EnterCriticalSection(&sockobj->SockCritSec);
  294.             if ((sockobj->OutstandingSend == 0) &&
  295.                 (sockobj->OutstandingRecv == 0) )
  296.             {
  297.                 bCleanupSocket = TRUE;
  298.             }
  299.             LeaveCriticalSection(&sockobj->SockCritSec);
  300.         }
  301.     }
  302.     else if (buf->operation == OP_WRITE)
  303.     {
  304.         sockobj = (SOCKET_OBJ *)key;
  305.         InterlockedDecrement(&sockobj->OutstandingSend);
  306.         InterlockedDecrement(&gOutstandingSends);
  307.         // Update the counters
  308.         InterlockedExchangeAdd(&gBytesSent, BytesTransfered);
  309.         InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered);
  310.         buf->buflen = gBufferSize;
  311.         if (sockobj->bClosing == FALSE)
  312.         {
  313.             buf->sock = sockobj;
  314.             PostRecv(sockobj, buf);
  315.         }
  316.     }
  317.     ProcessPendingOperations();
  318.     if (sockobj)
  319.     {
  320.         if (error != NO_ERROR)
  321.         {
  322.             printf("err = %d\n", error);
  323.             sockobj->bClosing = TRUE;
  324.         }
  325.         //
  326.         // Check to see if socket is closing
  327.         //
  328.         if ( (sockobj->OutstandingSend == 0) &&
  329.              (sockobj->OutstandingRecv == 0) &&
  330.              (sockobj->bClosing) )
  331.         {
  332.             bCleanupSocket = TRUE;
  333.         }
  334.         if (bCleanupSocket)
  335.         {
  336.             closesocket(sockobj->s);
  337.             sockobj->s = INVALID_SOCKET;
  338.             FreeSocketObj(sockobj);
  339.         }
  340.     }
  341.     return;
  342. }
  343. //
  344. // Function: CompletionThread
  345. //
  346. // Description:
  347. //    This is the completion thread which services our completion port. One of
  348. //    these threads is created per processor on the system. The thread sits in
  349. //    an infinite loop calling GetQueuedCompletionStatus and handling socket
  350. //    IO that completed.
  351. //
  352. DWORD WINAPI CompletionThread(LPVOID lpParam)
  353. {
  354.     ULONG_PTR    Key;
  355.     SOCKET       s;
  356.     BUFFER_OBJ  *bufobj=NULL;           // Per I/O object for completed I/O
  357.     OVERLAPPED  *lpOverlapped=NULL;     // Pointer to overlapped structure for completed I/O
  358.     HANDLE       CompletionPort;        // Completion port handle
  359.     DWORD        BytesTransfered,       // Number of bytes transfered
  360.                  Flags;                 // Flags for completed I/O
  361.     int          rc,
  362.                  error;
  363.     CompletionPort = (HANDLE)lpParam;
  364.     while (1)
  365.     {
  366.         error = NO_ERROR;
  367.         rc = GetQueuedCompletionStatus(
  368.                 CompletionPort,
  369.                &BytesTransfered,
  370.                 (PULONG_PTR)&Key,
  371.                &lpOverlapped,
  372.                 INFINITE
  373.                 );
  374.         bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
  375.         if (rc == FALSE)
  376.         {
  377.             // If the call fails, call WSAGetOverlappedResult to translate the
  378.             //    error code into a Winsock error code.
  379.             if (bufobj->operation == OP_ACCEPT)
  380.             {
  381.                 s = ((LISTEN_OBJ *)Key)->s;
  382.             }
  383.             else
  384.             {
  385.                 s = ((SOCKET_OBJ *)Key)->s;
  386.             }
  387.          
  388.             dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d [0x%x]\n",
  389.                     GetLastError(), lpOverlapped->Internal);
  390.             rc = WSAGetOverlappedResult(
  391.                     s,
  392.                    &bufobj->ol,
  393.                    &BytesTransfered,
  394.                     FALSE,
  395.                    &Flags
  396.                     );
  397.             if (rc == FALSE)
  398.             {
  399.                 error = WSAGetLastError();
  400.             }
  401.         }
  402.         // Handle the IO operation
  403.         HandleIo(Key, bufobj, CompletionPort, BytesTransfered, error);
  404.     }
  405.     ExitThread(0);
  406.     return 0;
  407. }
  408. //
  409. // Function: main
  410. //
  411. // Description:
  412. //      This is the main program. It parses the command line and creates
  413. //      the main socket. For TCP the socket is used to accept incoming
  414. //      client connections. Each client TCP connection is handed off to
  415. //      a worker thread which will receive any data on that connection
  416. //      until the connection is closed.
  417. //
  418. int __cdecl main(int argc, char **argv)
  419. {
  420.     WSADATA          wsd;
  421.     SYSTEM_INFO      sysinfo;
  422.     LISTEN_OBJ      *ListenSockets=NULL,
  423.                     *listenobj=NULL;
  424.     SOCKET_OBJ      *sockobj=NULL;
  425.     BUFFER_OBJ      *acceptobj=NULL;
  426.     GUID             guidAcceptEx = WSAID_ACCEPTEX,
  427.                      guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
  428.     DWORD            bytes;
  429.     HANDLE           CompletionPort,
  430.                      WaitEvents[MAX_COMPLETION_THREAD_COUNT],
  431.                      hrc;
  432.     int              endpointcount=0,
  433.                      waitcount=0,
  434.                      interval,
  435.                      rc,
  436.                      i;
  437.     struct addrinfo *res=NULL,
  438.                     *ptr=NULL;
  439.     // Validate the command line
  440.     ValidateArgs(argc, argv);
  441.     // Load Winsock
  442.     if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
  443.     {
  444.         fprintf(stderr, "unable to load Winsock!\n");
  445.         return -1;
  446.     }
  447.     InitializeCriticalSection(&gSocketListCs);
  448.     InitializeCriticalSection(&gBufferListCs);
  449.     InitializeCriticalSection(&gPendingCritSec);
  450.     // Create the completion port used by this server
  451.     CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
  452.     if (CompletionPort == NULL)
  453.     {
  454.         fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
  455.         return -1;
  456.     }
  457.     // Find out how many processors are on this system
  458.     GetSystemInfo(&sysinfo);
  459.     if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
  460.     {
  461.         sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
  462.     }
  463.     // Round the buffer size to the next increment of the page size
  464.     if ((gBufferSize % sysinfo.dwPageSize) != 0)
  465.     {
  466.         gBufferSize = ((gBufferSize / sysinfo.dwPageSize) + 1) * sysinfo.dwPageSize;
  467.     }
  468.     printf("Buffer size = %lu (page size = %lu)\n",
  469.         gBufferSize, sysinfo.dwPageSize);
  470.    
  471.     // Create the worker threads to service the completion notifications
  472.     for(waitcount=0; waitcount < (int)sysinfo.dwNumberOfProcessors ;waitcount++)
  473.     {
  474.         WaitEvents[waitcount] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
  475.         if (WaitEvents[waitcount] == NULL)
  476.         {
  477.             fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
  478.             return -1;
  479.         }
  480.     }
  481.     printf("Local address: %s; Port: %s; Family: %d\n",
  482.             gBindAddr, gBindPort, gAddressFamily);
  483.     // Obtain the "wildcard" addresses for all the available address families
  484.     res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
  485.     if (res == NULL)
  486.     {
  487.         fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
  488.         return -1;
  489.     }
  490.     // For each local address returned, create a listening/receiving socket
  491.     ptr = res;
  492.     while (ptr)
  493.     {
  494.         printf("Listening address: ");
  495.         PrintAddress(ptr->ai_addr, ptr->ai_addrlen);
  496.         printf("\n");
  497.         listenobj = (LISTEN_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(LISTEN_OBJ));
  498.         if (listenobj == NULL)
  499.         {
  500.             fprintf(stderr, "Out of memory!\n");
  501.             return -1;
  502.         }
  503.         
  504.         listenobj->LoWaterMark = gInitialAccepts;
  505.         InitializeCriticalSection(&listenobj->ListenCritSec);
  506.         
  507.         // Save off the address family of this socket
  508.         listenobj->AddressFamily = ptr->ai_family;
  509.         // create the socket
  510.         listenobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
  511.         if (listenobj->s == INVALID_SOCKET)
  512.         {
  513.             fprintf(stderr, "socket failed: %d\n", WSAGetLastError());
  514.             return -1;
  515.         }
  516.         // Create an event to register for FD_ACCEPT events on
  517.         listenobj->AcceptEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  518.         if (listenobj->AcceptEvent == NULL)
  519.         {
  520.             fprintf(stderr, "CreateEvent failed: %d\n", GetLastError());
  521.             return -1;
  522.         }
  523.         listenobj->RepostAccept = CreateEvent(NULL, TRUE, FALSE, NULL);
  524.         if (listenobj->RepostAccept == NULL)
  525.         {
  526.             fprintf(stderr, "CreateSemaphore failed: %d\n", GetLastError());
  527.             return -1;
  528.         }
  529.         // Add the event to the liste of waiting events
  530.         WaitEvents[waitcount++] = listenobj->AcceptEvent;
  531.         WaitEvents[waitcount++] = listenobj->RepostAccept;
  532.         // Associate the socket and its SOCKET_OBJ to the completion port
  533.         hrc = CreateIoCompletionPort((HANDLE)listenobj->s, CompletionPort, (ULONG_PTR)listenobj, 0);
  534.         if (hrc == NULL)
  535.         {
  536.             fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
  537.             return -1;
  538.         }
  539.         // bind the socket to a local address and port
  540.         rc = bind(listenobj->s, ptr->ai_addr, ptr->ai_addrlen);
  541.         if (rc == SOCKET_ERROR)
  542.         {
  543.             fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
  544.             return -1;
  545.         }
  546.         // Need to load the Winsock extension functions from each provider
  547.         //    -- e.g. AF_INET and AF_INET6.
  548.         rc = WSAIoctl(
  549.                 listenobj->s,
  550.                 SIO_GET_EXTENSION_FUNCTION_POINTER,
  551.                &guidAcceptEx,
  552.                 sizeof(guidAcceptEx),
  553.                &listenobj->lpfnAcceptEx,
  554.                 sizeof(listenobj->lpfnAcceptEx),
  555.                &bytes,
  556.                 NULL,
  557.                 NULL
  558.                 );
  559.         if (rc == SOCKET_ERROR)
  560.         {
  561.             fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
  562.                     WSAGetLastError());
  563.             return -1;
  564.         }
  565.         // Load the Winsock extensions from each provider
  566.         rc = WSAIoctl(
  567.                 listenobj->s,
  568.                 SIO_GET_EXTENSION_FUNCTION_POINTER,
  569.                &guidGetAcceptExSockaddrs,
  570.                 sizeof(guidGetAcceptExSockaddrs),
  571.                &listenobj->lpfnGetAcceptExSockaddrs,
  572.                 sizeof(listenobj->lpfnGetAcceptExSockaddrs),
  573.                &bytes,
  574.                 NULL,
  575.                 NULL
  576.                 );
  577.         if (rc == SOCKET_ERROR)
  578.         {
  579.             fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
  580.                     WSAGetLastError());
  581.             return -1;
  582.         }
  583.         // Put the socket into listening mode
  584.         rc = listen(listenobj->s, 200);
  585.         if (rc == SOCKET_ERROR)
  586.         {
  587.             fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
  588.             return -1;
  589.         }
  590.         // Register for FD_ACCEPT notification on listening socket
  591.         rc = WSAEventSelect(listenobj->s, listenobj->AcceptEvent, FD_ACCEPT);
  592.         if (rc == SOCKET_ERROR)
  593.         {
  594.             fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
  595.             return -1;
  596.         }
  597.         // Initiate the initial accepts for each listen socket
  598.         for(i=0; i < gInitialAccepts ;i++)
  599.         {
  600.             acceptobj = GetBufferObj(gBufferSize);
  601.             if (acceptobj == NULL)
  602.             {
  603.                 fprintf(stderr, "Out of memory!\n");
  604.                 return -1;
  605.             }
  606.             acceptobj->PostAccept = listenobj->AcceptEvent;
  607.             InsertPendingAccept(listenobj, acceptobj);
  608.             PostAccept(listenobj, acceptobj);
  609.         }
  610.         //
  611.         // Maintain a list of the listening socket structures
  612.         //
  613.         if (ListenSockets == NULL)
  614.         {
  615.             ListenSockets = listenobj;
  616.         }
  617.         else
  618.         {
  619.             listenobj->next = ListenSockets;
  620.             ListenSockets   = listenobj;
  621.         }
  622.         endpointcount++;
  623.         ptr = ptr->ai_next;
  624.     }
  625.     // free the addrinfo structure for the 'bind' address
  626.     freeaddrinfo(res);
  627.     gStartTime = gStartTimeLast = GetTickCount();
  628.     interval = 0;
  629.     while (1)
  630.     {
  631.         rc = WSAWaitForMultipleEvents(
  632.                 waitcount,
  633.                 WaitEvents,
  634.                 FALSE,
  635.                 5000,
  636.                 FALSE
  637.                 );
  638.         if (rc == WAIT_FAILED)
  639.         {
  640.             fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
  641.             break;
  642.         }
  643.         else if (rc == WAIT_TIMEOUT)
  644.         {
  645.             interval++;
  646.             PrintStatistics();
  647.             if (interval == 36)
  648.             {
  649.                 int          optval,
  650.                              optlen;
  651.                 // For TCP, cycle through all the outstanding AcceptEx operations
  652.                 //   to see if any of the client sockets have been connected but
  653.                 //   haven't received any data. If so, close them as they could be
  654.                 //   a denial of service attack.
  655.                 listenobj = ListenSockets;
  656.                 while (listenobj)
  657.                 {
  658.                     EnterCriticalSection(&listenobj->ListenCritSec);
  659.                     acceptobj = listenobj->PendingAccepts;
  660.                     while (acceptobj)
  661.                     {
  662.                         optlen = sizeof(optval);
  663.                         rc = getsockopt(
  664.                                 acceptobj->sclient,
  665.                                 SOL_SOCKET,
  666.                                 SO_CONNECT_TIME,
  667.                                 (char *)&optval,
  668.                                &optlen
  669.                                 );
  670.                         if (rc == SOCKET_ERROR)
  671.                         {
  672.                             fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
  673.                                     WSAGetLastError());
  674.                         }
  675.                         else
  676.                         {
  677.                             // If the socket has been connected for more than 5 minutes,
  678.                             //    close it. If closed, the AcceptEx call will fail in the
  679.                             //    completion thread.
  680.                             if ((optval != 0xFFFFFFFF) && (optval > 300))
  681.                             {
  682.                                 printf("closing stale handle\n");
  683.                                 closesocket(acceptobj->sclient);
  684.                                 acceptobj->sclient = INVALID_SOCKET;
  685.                             }
  686.                         }
  687.                         acceptobj = acceptobj->next;
  688.                     }
  689.                     LeaveCriticalSection(&listenobj->ListenCritSec);
  690.                     listenobj = listenobj->next;
  691.                 }
  692.                 interval = 0;
  693.             }
  694.         }
  695.         else
  696.         {
  697.             int index;
  698.             index = rc - WAIT_OBJECT_0;
  699.             for( ; index < waitcount ; index++)
  700.             {
  701.                 rc = WaitForSingleObject(WaitEvents[index], 0);
  702.                 if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
  703.                 {
  704.                     continue;
  705.                 }
  706.                 if (index < (int)sysinfo.dwNumberOfProcessors)
  707.                 {
  708.                     // One of the completion threads exited
  709.                     //   This is bad so just bail - a real server would want
  710.                     //   to gracefully exit but this is just a sample ...
  711.                     ExitProcess(-1);
  712.                 }
  713.                 else
  714.                 {
  715.                     // An FD_ACCEPT event occured
  716.                     listenobj = ListenSockets;
  717.                     while (listenobj)
  718.                     {
  719.                         if ((listenobj->AcceptEvent == WaitEvents[index]) ||
  720.                                 (listenobj->RepostAccept  == WaitEvents[index]))
  721.                             break;
  722.                         listenobj = listenobj->next;
  723.                     }
  724.                     if (listenobj)
  725.                     {
  726.                         WSANETWORKEVENTS ne;
  727.                         int              limit=0;
  728.                         if (listenobj->AcceptEvent == WaitEvents[index])
  729.                         {
  730.                             // EnumNetworkEvents to see if FD_ACCEPT was set
  731.                             rc = WSAEnumNetworkEvents(
  732.                                     listenobj->s,
  733.                                     listenobj->AcceptEvent,
  734.                                     &ne
  735.                                                      );
  736.                             if (rc == SOCKET_ERROR)
  737.                             {
  738.                                 fprintf(stderr, "WSAEnumNetworkEvents failed: %d\n",
  739.                                         WSAGetLastError());
  740.                             }
  741.                             if ((ne.lNetworkEvents & FD_ACCEPT) == FD_ACCEPT)
  742.                             {
  743.                                 // We got an FD_ACCEPT so post multiple accepts to
  744.                                 // cover the burst
  745.                                 limit = BURST_ACCEPT_COUNT;
  746.                             }
  747.                         }
  748.                         else if (listenobj->RepostAccept == WaitEvents[index])
  749.                         {
  750.                             // Semaphore is signaled
  751.                             limit = InterlockedExchange(&listenobj->RepostCount, 0);
  752.                             ResetEvent(listenobj->RepostAccept);
  753.                         }
  754.                         i = 0;
  755.                         while ( (i++ < limit) &&
  756.                                 (listenobj->PendingAcceptCount < gMaxAccepts) )
  757.                         {
  758.                             acceptobj = GetBufferObj(gBufferSize);
  759.                             if (acceptobj)
  760.                             {
  761.                                 acceptobj->PostAccept = listenobj->AcceptEvent;
  762.                                 InsertPendingAccept(listenobj, acceptobj);
  763.                                 PostAccept(listenobj, acceptobj);
  764.                             }
  765.                         }
  766.                     }
  767.                 }
  768.             }
  769.         }
  770.     }
  771.     WSACleanup();
  772.     return 0;
  773. }
复制代码
────────────────────────────
以上程序来自Windows网络编程2nd edition的源码,从中可以清晰的看到完成端口运转的整体框架。为了简单起见,省略了部分内存管理的代码,因为和理解IOCP的运行没什么关系。
 楼主| 发表于 2008-9-21 15:20:00 | 显示全部楼层
ACE的proactor模式在Windows的实现就是IOCP.

"ACE_WIN32_Proactor uses an I/O completion port for completion event detection."

in《ACE Programmer's Guide, The: Practical Design Patterns for Network and Systems Programming》8.4节
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-5-19 14:38 , Processed in 0.016696 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表