peakzhang 发表于 2008-9-21 15:19:47

Windows平台下,完成端口的程序框架

────────────────────────────────

//
// Function: PostRecv
//
// Description:
//    Post an overlapped receive operation on the socket.
//
int PostRecv(SOCKET_OBJ *sock, BUFFER_OBJ *recvobj)
{
    WSABUFwbuf;
    DWORD   bytes,
            flags;
    int   rc;
    recvobj->operation = OP_READ;
    wbuf.buf = recvobj->buf;
    wbuf.len = recvobj->buflen;
    flags = 0;
    EnterCriticalSection(&sock->SockCritSec);
    rc = WSARecv(
            sock->s,
         &wbuf,
            1,
         &bytes,
         &flags,
         &recvobj->ol,
            NULL
            );
    if (rc == SOCKET_ERROR)
    {
      rc = NO_ERROR;
      if (WSAGetLastError() != WSA_IO_PENDING)
      {
            dbgprint("PostRecv: WSARecv* failed: %d\n", WSAGetLastError());
            rc = SOCKET_ERROR;
      }
    }
    if (rc == NO_ERROR)
    {
      // Increment outstanding overlapped operations
      InterlockedIncrement(&sock->OutstandingRecv);
    }
    LeaveCriticalSection(&sock->SockCritSec);
    return rc;
}
//
// Function: PostSend
//
// Description:
//    Post an overlapped send operation on the socket.
//
int PostSend(SOCKET_OBJ *sock, BUFFER_OBJ *sendobj)
{
    WSABUFwbuf;
    DWORD   bytes;
    int   rc, err;
    sendobj->operation = OP_WRITE;
    wbuf.buf = sendobj->buf;
    wbuf.len = sendobj->buflen;
    EnterCriticalSection(&sock->SockCritSec);
    rc = WSASend(
            sock->s,
         &wbuf,
            1,
         &bytes,
            0,
         &sendobj->ol,
            NULL
            );
    if (rc == SOCKET_ERROR)
    {
      rc = NO_ERROR;
      if ((err = WSAGetLastError()) != WSA_IO_PENDING)
      {
            if (err == WSAENOBUFS)
                DebugBreak();
            dbgprint("PostSend: WSASend* failed: %d \n", WSAGetLastError(), sendobj->ol.Internal);
            rc = SOCKET_ERROR;
      }
    }
    if (rc == NO_ERROR)
    {
      // Increment the outstanding operation count
      InterlockedIncrement(&sock->OutstandingSend);
      InterlockedIncrement(&gOutstandingSends);
    }
    LeaveCriticalSection(&sock->SockCritSec);
    return rc;
}
//
// Function: PostAccept
//
// Description:
//    Post an overlapped accept on a listening socket.
//
int PostAccept(LISTEN_OBJ *listen, BUFFER_OBJ *acceptobj)
{
    DWORD   bytes;
    int   rc;
    acceptobj->operation = OP_ACCEPT;
    // Create the client socket for an incoming connection
    acceptobj->sclient = socket(listen->AddressFamily, SOCK_STREAM, IPPROTO_TCP);
    if (acceptobj->sclient == INVALID_SOCKET)
    {
      fprintf(stderr, "PostAccept: socket failed: %d\n", WSAGetLastError());
      return -1;
    }
    rc = listen->lpfnAcceptEx(
            listen->s,
            acceptobj->sclient,
            acceptobj->buf,
            acceptobj->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
            sizeof(SOCKADDR_STORAGE) + 16,
            sizeof(SOCKADDR_STORAGE) + 16,
         &bytes,
         &acceptobj->ol
            );
    if (rc == FALSE)
    {
      if (WSAGetLastError() != WSA_IO_PENDING)
      {
            printf("PostAccept: AcceptEx failed: %d\n",
                  WSAGetLastError());
            return SOCKET_ERROR;
      }
    }
    // Increment the outstanding overlapped count for this socket
    InterlockedIncrement(&listen->PendingAcceptCount);
    return NO_ERROR;
}
//
// Function: HandleIo
//
// Description:
//    This function handles the IO on a socket. In the event of a receive, the
//    completed receive is posted again. For completed accepts, another AcceptEx
//    is posted. For completed sends, the buffer is freed.
//
void HandleIo(ULONG_PTR key, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error)
{
    LISTEN_OBJ *listenobj=NULL;
    SOCKET_OBJ *sockobj=NULL,
               *clientobj=NULL;   // New client object for accepted connections
    BUFFER_OBJ *recvobj=NULL,       // Used to post new receives on accepted connections
               *sendobj=NULL;       // Used to post new sends for data received
    BOOL      bCleanupSocket;
    if (error != 0)
    {
      dbgprint("OP = %d; Error = %d\n", buf->operation, error);
    }
    bCleanupSocket = FALSE;
    if (error != NO_ERROR)
    {
      // An error occured on a TCP socket, free the associated per I/O buffer
      // and see if there are any more outstanding operations. If so we must
      // wait until they are complete as well.
      //
      if (buf->operation != OP_ACCEPT)
      {
            sockobj = (SOCKET_OBJ *)key;
            if (buf->operation == OP_READ)
            {
                if ((InterlockedDecrement(&sockobj->OutstandingRecv) == 0) &&
                  (sockobj->OutstandingSend == 0) )
                {
                  dbgprint("Freeing socket obj in GetOverlappedResult\n");
                  FreeSocketObj(sockobj);
                }
            }
            else if (buf->operation == OP_WRITE)
            {
                if ((InterlockedDecrement(&sockobj->OutstandingSend) == 0) &&
                  (sockobj->OutstandingRecv == 0) )
                {
                  dbgprint("Freeing socket obj in GetOverlappedResult\n");
                  FreeSocketObj(sockobj);
                }
            }
      }
      else
      {
            listenobj = (LISTEN_OBJ *)key;
            printf("Accept failed\n");
            closesocket(buf->sclient);
            buf->sclient = INVALID_SOCKET;
      }
      FreeBufferObj(buf);
      return;
    }
    if (buf->operation == OP_ACCEPT)
    {
      HANDLE            hrc;
      SOCKADDR_STORAGE *LocalSockaddr=NULL,
                         *RemoteSockaddr=NULL;
      int               LocalSockaddrLen,
                        RemoteSockaddrLen;
      listenobj = (LISTEN_OBJ *)key;
      // Update counters
      InterlockedIncrement(&gConnections);
      InterlockedIncrement(&gConnectionsLast);
      InterlockedDecrement(&listenobj->PendingAcceptCount);
      InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
      InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
      // Print the client's addresss
      listenobj->lpfnGetAcceptExSockaddrs(
                buf->buf,
                buf->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
                sizeof(SOCKADDR_STORAGE) + 16,
                sizeof(SOCKADDR_STORAGE) + 16,
                (SOCKADDR **)&LocalSockaddr,
               &LocalSockaddrLen,
                (SOCKADDR **)&RemoteSockaddr,
               &RemoteSockaddrLen
                );
      RemovePendingAccept(listenobj, buf);
      // Get a new SOCKET_OBJ for the client connection
      clientobj = GetSocketObj(buf->sclient, listenobj->AddressFamily);
      if (clientobj)
      {
            // Associate the new connection to our completion port
            hrc = CreateIoCompletionPort(
                  (HANDLE)clientobj->s,
                  CompPort,
                  (ULONG_PTR)clientobj,
                  0
                  );
            if (hrc == NULL)
            {
                fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n",
                        GetLastError());
                return;
            }
            sendobj = buf;
            sendobj->buflen = BytesTransfered;
            // Post the send - this is the first one for this connection so just do it
            sendobj->sock = clientobj;
            //PostSend(clientobj, sendobj);
            EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
      }
      else
      {
            // Can't allocate a socket structure so close the connection
            closesocket(buf->sclient);
            buf->sclient = INVALID_SOCKET;
            FreeBufferObj(buf);
      }
      
if (error != NO_ERROR)
{
            // Check for socket closure
            EnterCriticalSection(&clientobj->SockCritSec);
            if ( (clientobj->OutstandingSend == 0) &&
               (clientobj->OutstandingRecv == 0) )
            {
                closesocket(clientobj->s);
                clientobj->s = INVALID_SOCKET;
                FreeSocketObj(clientobj);
            }
            else
            {
                clientobj->bClosing = TRUE;
            }
            LeaveCriticalSection(&clientobj->SockCritSec);
            error = NO_ERROR;
}
      InterlockedIncrement(&listenobj->RepostCount);
      SetEvent(listenobj->RepostAccept);
    }
    else if (buf->operation == OP_READ)
    {
      sockobj = (SOCKET_OBJ *)key;
      InterlockedDecrement(&sockobj->OutstandingRecv);
      //
      // Receive completed successfully
      //
      if (BytesTransfered > 0)
      {
            InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
            InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
            // Make the recv a send
            sendobj         = buf;
            sendobj->buflen = BytesTransfered;
            sendobj->sock = sockobj;
            //PostSend(sockobj, sendobj);
            EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
      }
      else
      {
            //dbgprint("Got 0 byte receive\n");
            // Graceful close - the receive returned 0 bytes read
            sockobj->bClosing = TRUE;
            // Free the receive buffer
            FreeBufferObj(buf);
            // If this was the last outstanding operation on socket, clean it up
            EnterCriticalSection(&sockobj->SockCritSec);
            if ((sockobj->OutstandingSend == 0) &&
                (sockobj->OutstandingRecv == 0) )
            {
                bCleanupSocket = TRUE;
            }
            LeaveCriticalSection(&sockobj->SockCritSec);
      }
    }
    else if (buf->operation == OP_WRITE)
    {
      sockobj = (SOCKET_OBJ *)key;
      InterlockedDecrement(&sockobj->OutstandingSend);
      InterlockedDecrement(&gOutstandingSends);
      // Update the counters
      InterlockedExchangeAdd(&gBytesSent, BytesTransfered);
      InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered);
      buf->buflen = gBufferSize;
      if (sockobj->bClosing == FALSE)
      {
            buf->sock = sockobj;
            PostRecv(sockobj, buf);
      }
    }
    ProcessPendingOperations();
    if (sockobj)
    {
      if (error != NO_ERROR)
      {
            printf("err = %d\n", error);
            sockobj->bClosing = TRUE;
      }
      //
      // Check to see if socket is closing
      //
      if ( (sockobj->OutstandingSend == 0) &&
             (sockobj->OutstandingRecv == 0) &&
             (sockobj->bClosing) )
      {
            bCleanupSocket = TRUE;
      }
      if (bCleanupSocket)
      {
            closesocket(sockobj->s);
            sockobj->s = INVALID_SOCKET;
            FreeSocketObj(sockobj);
      }
    }
    return;
}
//
// Function: CompletionThread
//
// Description:
//    This is the completion thread which services our completion port. One of
//    these threads is created per processor on the system. The thread sits in
//    an infinite loop calling GetQueuedCompletionStatus and handling socket
//    IO that completed.
//
DWORD WINAPI CompletionThread(LPVOID lpParam)
{
    ULONG_PTR    Key;
    SOCKET       s;
    BUFFER_OBJ*bufobj=NULL;         // Per I/O object for completed I/O
    OVERLAPPED*lpOverlapped=NULL;   // Pointer to overlapped structure for completed I/O
    HANDLE       CompletionPort;      // Completion port handle
    DWORD      BytesTransfered,       // Number of bytes transfered
               Flags;               // Flags for completed I/O
    int          rc,
               error;
    CompletionPort = (HANDLE)lpParam;
    while (1)
    {
      error = NO_ERROR;
      rc = GetQueuedCompletionStatus(
                CompletionPort,
               &BytesTransfered,
                (PULONG_PTR)&Key,
               &lpOverlapped,
                INFINITE
                );
      bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
      if (rc == FALSE)
      {
            // If the call fails, call WSAGetOverlappedResult to translate the
            //    error code into a Winsock error code.
            if (bufobj->operation == OP_ACCEPT)
            {
                s = ((LISTEN_OBJ *)Key)->s;
            }
            else
            {
                s = ((SOCKET_OBJ *)Key)->s;
            }
         
            dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d \n",
                  GetLastError(), lpOverlapped->Internal);
            rc = WSAGetOverlappedResult(
                  s,
                   &bufobj->ol,
                   &BytesTransfered,
                  FALSE,
                   &Flags
                  );
            if (rc == FALSE)
            {
                error = WSAGetLastError();
            }
      }
      // Handle the IO operation
      HandleIo(Key, bufobj, CompletionPort, BytesTransfered, error);
    }
    ExitThread(0);
    return 0;
}
//
// Function: main
//
// Description:
//      This is the main program. It parses the command line and creates
//      the main socket. For TCP the socket is used to accept incoming
//      client connections. Each client TCP connection is handed off to
//      a worker thread which will receive any data on that connection
//      until the connection is closed.
//
int __cdecl main(int argc, char **argv)
{
    WSADATA          wsd;
    SYSTEM_INFO      sysinfo;
    LISTEN_OBJ      *ListenSockets=NULL,
                  *listenobj=NULL;
    SOCKET_OBJ      *sockobj=NULL;
    BUFFER_OBJ      *acceptobj=NULL;
    GUID             guidAcceptEx = WSAID_ACCEPTEX,
                     guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
    DWORD            bytes;
    HANDLE         CompletionPort,
                     WaitEvents,
                     hrc;
    int            endpointcount=0,
                     waitcount=0,
                     interval,
                     rc,
                     i;
    struct addrinfo *res=NULL,
                  *ptr=NULL;
    // Validate the command line
    ValidateArgs(argc, argv);
    // Load Winsock
    if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
    {
      fprintf(stderr, "unable to load Winsock!\n");
      return -1;
    }
    InitializeCriticalSection(&gSocketListCs);
    InitializeCriticalSection(&gBufferListCs);
    InitializeCriticalSection(&gPendingCritSec);
    // Create the completion port used by this server
    CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
    if (CompletionPort == NULL)
    {
      fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
      return -1;
    }
    // Find out how many processors are on this system
    GetSystemInfo(&sysinfo);
    if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
    {
      sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
    }
    // Round the buffer size to the next increment of the page size
    if ((gBufferSize % sysinfo.dwPageSize) != 0)
    {
      gBufferSize = ((gBufferSize / sysinfo.dwPageSize) + 1) * sysinfo.dwPageSize;
    }
    printf("Buffer size = %lu (page size = %lu)\n",
      gBufferSize, sysinfo.dwPageSize);
   
    // Create the worker threads to service the completion notifications
    for(waitcount=0; waitcount < (int)sysinfo.dwNumberOfProcessors ;waitcount++)
    {
      WaitEvents = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
      if (WaitEvents == NULL)
      {
            fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
            return -1;
      }
    }
    printf("Local address: %s; Port: %s; Family: %d\n",
            gBindAddr, gBindPort, gAddressFamily);
    // Obtain the "wildcard" addresses for all the available address families
    res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
    if (res == NULL)
    {
      fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
      return -1;
    }
    // For each local address returned, create a listening/receiving socket
    ptr = res;
    while (ptr)
    {
      printf("Listening address: ");
      PrintAddress(ptr->ai_addr, ptr->ai_addrlen);
      printf("\n");
      listenobj = (LISTEN_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(LISTEN_OBJ));
      if (listenobj == NULL)
      {
            fprintf(stderr, "Out of memory!\n");
            return -1;
      }
      
      listenobj->LoWaterMark = gInitialAccepts;
      InitializeCriticalSection(&listenobj->ListenCritSec);
      
      // Save off the address family of this socket
      listenobj->AddressFamily = ptr->ai_family;
      // create the socket
      listenobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
      if (listenobj->s == INVALID_SOCKET)
      {
            fprintf(stderr, "socket failed: %d\n", WSAGetLastError());
            return -1;
      }
      // Create an event to register for FD_ACCEPT events on
      listenobj->AcceptEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
      if (listenobj->AcceptEvent == NULL)
      {
            fprintf(stderr, "CreateEvent failed: %d\n", GetLastError());
            return -1;
      }
      listenobj->RepostAccept = CreateEvent(NULL, TRUE, FALSE, NULL);
      if (listenobj->RepostAccept == NULL)
      {
            fprintf(stderr, "CreateSemaphore failed: %d\n", GetLastError());
            return -1;
      }
      // Add the event to the liste of waiting events
      WaitEvents = listenobj->AcceptEvent;
      WaitEvents = listenobj->RepostAccept;
      // Associate the socket and its SOCKET_OBJ to the completion port
      hrc = CreateIoCompletionPort((HANDLE)listenobj->s, CompletionPort, (ULONG_PTR)listenobj, 0);
      if (hrc == NULL)
      {
            fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
            return -1;
      }
      // bind the socket to a local address and port
      rc = bind(listenobj->s, ptr->ai_addr, ptr->ai_addrlen);
      if (rc == SOCKET_ERROR)
      {
            fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
            return -1;
      }
      // Need to load the Winsock extension functions from each provider
      //    -- e.g. AF_INET and AF_INET6.
      rc = WSAIoctl(
                listenobj->s,
                SIO_GET_EXTENSION_FUNCTION_POINTER,
               &guidAcceptEx,
                sizeof(guidAcceptEx),
               &listenobj->lpfnAcceptEx,
                sizeof(listenobj->lpfnAcceptEx),
               &bytes,
                NULL,
                NULL
                );
      if (rc == SOCKET_ERROR)
      {
            fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
                  WSAGetLastError());
            return -1;
      }
      // Load the Winsock extensions from each provider
      rc = WSAIoctl(
                listenobj->s,
                SIO_GET_EXTENSION_FUNCTION_POINTER,
               &guidGetAcceptExSockaddrs,
                sizeof(guidGetAcceptExSockaddrs),
               &listenobj->lpfnGetAcceptExSockaddrs,
                sizeof(listenobj->lpfnGetAcceptExSockaddrs),
               &bytes,
                NULL,
                NULL
                );
      if (rc == SOCKET_ERROR)
      {
            fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
                  WSAGetLastError());
            return -1;
      }
      // Put the socket into listening mode
      rc = listen(listenobj->s, 200);
      if (rc == SOCKET_ERROR)
      {
            fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
            return -1;
      }
      // Register for FD_ACCEPT notification on listening socket
      rc = WSAEventSelect(listenobj->s, listenobj->AcceptEvent, FD_ACCEPT);
      if (rc == SOCKET_ERROR)
      {
            fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
            return -1;
      }
      // Initiate the initial accepts for each listen socket
      for(i=0; i < gInitialAccepts ;i++)
      {
            acceptobj = GetBufferObj(gBufferSize);
            if (acceptobj == NULL)
            {
                fprintf(stderr, "Out of memory!\n");
                return -1;
            }
            acceptobj->PostAccept = listenobj->AcceptEvent;
            InsertPendingAccept(listenobj, acceptobj);
            PostAccept(listenobj, acceptobj);
      }
      //
      // Maintain a list of the listening socket structures
      //
      if (ListenSockets == NULL)
      {
            ListenSockets = listenobj;
      }
      else
      {
            listenobj->next = ListenSockets;
            ListenSockets   = listenobj;
      }
      endpointcount++;
      ptr = ptr->ai_next;
    }
    // free the addrinfo structure for the 'bind' address
    freeaddrinfo(res);
    gStartTime = gStartTimeLast = GetTickCount();
    interval = 0;
    while (1)
    {
      rc = WSAWaitForMultipleEvents(
                waitcount,
                WaitEvents,
                FALSE,
                5000,
                FALSE
                );
      if (rc == WAIT_FAILED)
      {
            fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
            break;
      }
      else if (rc == WAIT_TIMEOUT)
      {
            interval++;
            PrintStatistics();
            if (interval == 36)
            {
                int          optval,
                           optlen;
                // For TCP, cycle through all the outstanding AcceptEx operations
                //   to see if any of the client sockets have been connected but
                //   haven't received any data. If so, close them as they could be
                //   a denial of service attack.
                listenobj = ListenSockets;
                while (listenobj)
                {
                  EnterCriticalSection(&listenobj->ListenCritSec);
                  acceptobj = listenobj->PendingAccepts;
                  while (acceptobj)
                  {
                        optlen = sizeof(optval);
                        rc = getsockopt(
                              acceptobj->sclient,
                              SOL_SOCKET,
                              SO_CONNECT_TIME,
                              (char *)&optval,
                               &optlen
                              );
                        if (rc == SOCKET_ERROR)
                        {
                            fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
                                    WSAGetLastError());
                        }
                        else
                        {
                            // If the socket has been connected for more than 5 minutes,
                            //    close it. If closed, the AcceptEx call will fail in the
                            //    completion thread.
                            if ((optval != 0xFFFFFFFF) && (optval > 300))
                            {
                              printf("closing stale handle\n");
                              closesocket(acceptobj->sclient);
                              acceptobj->sclient = INVALID_SOCKET;
                            }
                        }
                        acceptobj = acceptobj->next;
                  }
                  LeaveCriticalSection(&listenobj->ListenCritSec);
                  listenobj = listenobj->next;
                }
                interval = 0;
            }
      }
      else
      {
            int index;
            index = rc - WAIT_OBJECT_0;
            for( ; index < waitcount ; index++)
            {
                rc = WaitForSingleObject(WaitEvents, 0);
                if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
                {
                  continue;
                }
                if (index < (int)sysinfo.dwNumberOfProcessors)
                {
                  // One of the completion threads exited
                  //   This is bad so just bail - a real server would want
                  //   to gracefully exit but this is just a sample ...
                  ExitProcess(-1);
                }
                else
                {
                  // An FD_ACCEPT event occured
                  listenobj = ListenSockets;
                  while (listenobj)
                  {
                        if ((listenobj->AcceptEvent == WaitEvents) ||
                              (listenobj->RepostAccept== WaitEvents))
                            break;
                        listenobj = listenobj->next;
                  }
                  if (listenobj)
                  {
                        WSANETWORKEVENTS ne;
                        int            limit=0;
                        if (listenobj->AcceptEvent == WaitEvents)
                        {
                            // EnumNetworkEvents to see if FD_ACCEPT was set
                            rc = WSAEnumNetworkEvents(
                                    listenobj->s,
                                    listenobj->AcceptEvent,
                                    &ne
                                                   );
                            if (rc == SOCKET_ERROR)
                            {
                              fprintf(stderr, "WSAEnumNetworkEvents failed: %d\n",
                                        WSAGetLastError());
                            }
                            if ((ne.lNetworkEvents & FD_ACCEPT) == FD_ACCEPT)
                            {
                              // We got an FD_ACCEPT so post multiple accepts to
                              // cover the burst
                              limit = BURST_ACCEPT_COUNT;
                            }
                        }
                        else if (listenobj->RepostAccept == WaitEvents)
                        {
                            // Semaphore is signaled
                            limit = InterlockedExchange(&listenobj->RepostCount, 0);
                            ResetEvent(listenobj->RepostAccept);
                        }
                        i = 0;
                        while ( (i++ < limit) &&
                              (listenobj->PendingAcceptCount < gMaxAccepts) )
                        {
                            acceptobj = GetBufferObj(gBufferSize);
                            if (acceptobj)
                            {
                              acceptobj->PostAccept = listenobj->AcceptEvent;
                              InsertPendingAccept(listenobj, acceptobj);
                              PostAccept(listenobj, acceptobj);
                            }
                        }
                  }
                }
            }
      }
    }
    WSACleanup();
    return 0;
}

────────────────────────────
以上程序来自Windows网络编程2nd edition的源码,从中可以清晰的看到完成端口运转的整体框架。为了简单起见,省略了部分内存管理的代码,因为和理解IOCP的运行没什么关系。

peakzhang 发表于 2008-9-21 15:20:00

ACE的proactor模式在Windows的实现就是IOCP.

"ACE_WIN32_Proactor uses an I/O completion port for completion event detection."

in《ACE Programmer's Guide, The: Practical Design Patterns for Network and Systems Programming》8.4节
页: [1]
查看完整版本: Windows平台下,完成端口的程序框架