Windows平台下,完成端口的程序框架
────────────────────────────────//
// Function: PostRecv
//
// Description:
// Post an overlapped receive operation on the socket.
//
int PostRecv(SOCKET_OBJ *sock, BUFFER_OBJ *recvobj)
{
WSABUFwbuf;
DWORD bytes,
flags;
int rc;
recvobj->operation = OP_READ;
wbuf.buf = recvobj->buf;
wbuf.len = recvobj->buflen;
flags = 0;
EnterCriticalSection(&sock->SockCritSec);
rc = WSARecv(
sock->s,
&wbuf,
1,
&bytes,
&flags,
&recvobj->ol,
NULL
);
if (rc == SOCKET_ERROR)
{
rc = NO_ERROR;
if (WSAGetLastError() != WSA_IO_PENDING)
{
dbgprint("PostRecv: WSARecv* failed: %d\n", WSAGetLastError());
rc = SOCKET_ERROR;
}
}
if (rc == NO_ERROR)
{
// Increment outstanding overlapped operations
InterlockedIncrement(&sock->OutstandingRecv);
}
LeaveCriticalSection(&sock->SockCritSec);
return rc;
}
//
// Function: PostSend
//
// Description:
// Post an overlapped send operation on the socket.
//
int PostSend(SOCKET_OBJ *sock, BUFFER_OBJ *sendobj)
{
WSABUFwbuf;
DWORD bytes;
int rc, err;
sendobj->operation = OP_WRITE;
wbuf.buf = sendobj->buf;
wbuf.len = sendobj->buflen;
EnterCriticalSection(&sock->SockCritSec);
rc = WSASend(
sock->s,
&wbuf,
1,
&bytes,
0,
&sendobj->ol,
NULL
);
if (rc == SOCKET_ERROR)
{
rc = NO_ERROR;
if ((err = WSAGetLastError()) != WSA_IO_PENDING)
{
if (err == WSAENOBUFS)
DebugBreak();
dbgprint("PostSend: WSASend* failed: %d \n", WSAGetLastError(), sendobj->ol.Internal);
rc = SOCKET_ERROR;
}
}
if (rc == NO_ERROR)
{
// Increment the outstanding operation count
InterlockedIncrement(&sock->OutstandingSend);
InterlockedIncrement(&gOutstandingSends);
}
LeaveCriticalSection(&sock->SockCritSec);
return rc;
}
//
// Function: PostAccept
//
// Description:
// Post an overlapped accept on a listening socket.
//
int PostAccept(LISTEN_OBJ *listen, BUFFER_OBJ *acceptobj)
{
DWORD bytes;
int rc;
acceptobj->operation = OP_ACCEPT;
// Create the client socket for an incoming connection
acceptobj->sclient = socket(listen->AddressFamily, SOCK_STREAM, IPPROTO_TCP);
if (acceptobj->sclient == INVALID_SOCKET)
{
fprintf(stderr, "PostAccept: socket failed: %d\n", WSAGetLastError());
return -1;
}
rc = listen->lpfnAcceptEx(
listen->s,
acceptobj->sclient,
acceptobj->buf,
acceptobj->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16,
sizeof(SOCKADDR_STORAGE) + 16,
&bytes,
&acceptobj->ol
);
if (rc == FALSE)
{
if (WSAGetLastError() != WSA_IO_PENDING)
{
printf("PostAccept: AcceptEx failed: %d\n",
WSAGetLastError());
return SOCKET_ERROR;
}
}
// Increment the outstanding overlapped count for this socket
InterlockedIncrement(&listen->PendingAcceptCount);
return NO_ERROR;
}
//
// Function: HandleIo
//
// Description:
// This function handles the IO on a socket. In the event of a receive, the
// completed receive is posted again. For completed accepts, another AcceptEx
// is posted. For completed sends, the buffer is freed.
//
void HandleIo(ULONG_PTR key, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error)
{
LISTEN_OBJ *listenobj=NULL;
SOCKET_OBJ *sockobj=NULL,
*clientobj=NULL; // New client object for accepted connections
BUFFER_OBJ *recvobj=NULL, // Used to post new receives on accepted connections
*sendobj=NULL; // Used to post new sends for data received
BOOL bCleanupSocket;
if (error != 0)
{
dbgprint("OP = %d; Error = %d\n", buf->operation, error);
}
bCleanupSocket = FALSE;
if (error != NO_ERROR)
{
// An error occured on a TCP socket, free the associated per I/O buffer
// and see if there are any more outstanding operations. If so we must
// wait until they are complete as well.
//
if (buf->operation != OP_ACCEPT)
{
sockobj = (SOCKET_OBJ *)key;
if (buf->operation == OP_READ)
{
if ((InterlockedDecrement(&sockobj->OutstandingRecv) == 0) &&
(sockobj->OutstandingSend == 0) )
{
dbgprint("Freeing socket obj in GetOverlappedResult\n");
FreeSocketObj(sockobj);
}
}
else if (buf->operation == OP_WRITE)
{
if ((InterlockedDecrement(&sockobj->OutstandingSend) == 0) &&
(sockobj->OutstandingRecv == 0) )
{
dbgprint("Freeing socket obj in GetOverlappedResult\n");
FreeSocketObj(sockobj);
}
}
}
else
{
listenobj = (LISTEN_OBJ *)key;
printf("Accept failed\n");
closesocket(buf->sclient);
buf->sclient = INVALID_SOCKET;
}
FreeBufferObj(buf);
return;
}
if (buf->operation == OP_ACCEPT)
{
HANDLE hrc;
SOCKADDR_STORAGE *LocalSockaddr=NULL,
*RemoteSockaddr=NULL;
int LocalSockaddrLen,
RemoteSockaddrLen;
listenobj = (LISTEN_OBJ *)key;
// Update counters
InterlockedIncrement(&gConnections);
InterlockedIncrement(&gConnectionsLast);
InterlockedDecrement(&listenobj->PendingAcceptCount);
InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
// Print the client's addresss
listenobj->lpfnGetAcceptExSockaddrs(
buf->buf,
buf->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16,
sizeof(SOCKADDR_STORAGE) + 16,
(SOCKADDR **)&LocalSockaddr,
&LocalSockaddrLen,
(SOCKADDR **)&RemoteSockaddr,
&RemoteSockaddrLen
);
RemovePendingAccept(listenobj, buf);
// Get a new SOCKET_OBJ for the client connection
clientobj = GetSocketObj(buf->sclient, listenobj->AddressFamily);
if (clientobj)
{
// Associate the new connection to our completion port
hrc = CreateIoCompletionPort(
(HANDLE)clientobj->s,
CompPort,
(ULONG_PTR)clientobj,
0
);
if (hrc == NULL)
{
fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n",
GetLastError());
return;
}
sendobj = buf;
sendobj->buflen = BytesTransfered;
// Post the send - this is the first one for this connection so just do it
sendobj->sock = clientobj;
//PostSend(clientobj, sendobj);
EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
}
else
{
// Can't allocate a socket structure so close the connection
closesocket(buf->sclient);
buf->sclient = INVALID_SOCKET;
FreeBufferObj(buf);
}
if (error != NO_ERROR)
{
// Check for socket closure
EnterCriticalSection(&clientobj->SockCritSec);
if ( (clientobj->OutstandingSend == 0) &&
(clientobj->OutstandingRecv == 0) )
{
closesocket(clientobj->s);
clientobj->s = INVALID_SOCKET;
FreeSocketObj(clientobj);
}
else
{
clientobj->bClosing = TRUE;
}
LeaveCriticalSection(&clientobj->SockCritSec);
error = NO_ERROR;
}
InterlockedIncrement(&listenobj->RepostCount);
SetEvent(listenobj->RepostAccept);
}
else if (buf->operation == OP_READ)
{
sockobj = (SOCKET_OBJ *)key;
InterlockedDecrement(&sockobj->OutstandingRecv);
//
// Receive completed successfully
//
if (BytesTransfered > 0)
{
InterlockedExchangeAdd(&gBytesRead, BytesTransfered);
InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered);
// Make the recv a send
sendobj = buf;
sendobj->buflen = BytesTransfered;
sendobj->sock = sockobj;
//PostSend(sockobj, sendobj);
EnqueuePendingOperation(&gPendingSendList, &gPendingSendListEnd, sendobj, OP_WRITE);
}
else
{
//dbgprint("Got 0 byte receive\n");
// Graceful close - the receive returned 0 bytes read
sockobj->bClosing = TRUE;
// Free the receive buffer
FreeBufferObj(buf);
// If this was the last outstanding operation on socket, clean it up
EnterCriticalSection(&sockobj->SockCritSec);
if ((sockobj->OutstandingSend == 0) &&
(sockobj->OutstandingRecv == 0) )
{
bCleanupSocket = TRUE;
}
LeaveCriticalSection(&sockobj->SockCritSec);
}
}
else if (buf->operation == OP_WRITE)
{
sockobj = (SOCKET_OBJ *)key;
InterlockedDecrement(&sockobj->OutstandingSend);
InterlockedDecrement(&gOutstandingSends);
// Update the counters
InterlockedExchangeAdd(&gBytesSent, BytesTransfered);
InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered);
buf->buflen = gBufferSize;
if (sockobj->bClosing == FALSE)
{
buf->sock = sockobj;
PostRecv(sockobj, buf);
}
}
ProcessPendingOperations();
if (sockobj)
{
if (error != NO_ERROR)
{
printf("err = %d\n", error);
sockobj->bClosing = TRUE;
}
//
// Check to see if socket is closing
//
if ( (sockobj->OutstandingSend == 0) &&
(sockobj->OutstandingRecv == 0) &&
(sockobj->bClosing) )
{
bCleanupSocket = TRUE;
}
if (bCleanupSocket)
{
closesocket(sockobj->s);
sockobj->s = INVALID_SOCKET;
FreeSocketObj(sockobj);
}
}
return;
}
//
// Function: CompletionThread
//
// Description:
// This is the completion thread which services our completion port. One of
// these threads is created per processor on the system. The thread sits in
// an infinite loop calling GetQueuedCompletionStatus and handling socket
// IO that completed.
//
DWORD WINAPI CompletionThread(LPVOID lpParam)
{
ULONG_PTR Key;
SOCKET s;
BUFFER_OBJ*bufobj=NULL; // Per I/O object for completed I/O
OVERLAPPED*lpOverlapped=NULL; // Pointer to overlapped structure for completed I/O
HANDLE CompletionPort; // Completion port handle
DWORD BytesTransfered, // Number of bytes transfered
Flags; // Flags for completed I/O
int rc,
error;
CompletionPort = (HANDLE)lpParam;
while (1)
{
error = NO_ERROR;
rc = GetQueuedCompletionStatus(
CompletionPort,
&BytesTransfered,
(PULONG_PTR)&Key,
&lpOverlapped,
INFINITE
);
bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol);
if (rc == FALSE)
{
// If the call fails, call WSAGetOverlappedResult to translate the
// error code into a Winsock error code.
if (bufobj->operation == OP_ACCEPT)
{
s = ((LISTEN_OBJ *)Key)->s;
}
else
{
s = ((SOCKET_OBJ *)Key)->s;
}
dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d \n",
GetLastError(), lpOverlapped->Internal);
rc = WSAGetOverlappedResult(
s,
&bufobj->ol,
&BytesTransfered,
FALSE,
&Flags
);
if (rc == FALSE)
{
error = WSAGetLastError();
}
}
// Handle the IO operation
HandleIo(Key, bufobj, CompletionPort, BytesTransfered, error);
}
ExitThread(0);
return 0;
}
//
// Function: main
//
// Description:
// This is the main program. It parses the command line and creates
// the main socket. For TCP the socket is used to accept incoming
// client connections. Each client TCP connection is handed off to
// a worker thread which will receive any data on that connection
// until the connection is closed.
//
int __cdecl main(int argc, char **argv)
{
WSADATA wsd;
SYSTEM_INFO sysinfo;
LISTEN_OBJ *ListenSockets=NULL,
*listenobj=NULL;
SOCKET_OBJ *sockobj=NULL;
BUFFER_OBJ *acceptobj=NULL;
GUID guidAcceptEx = WSAID_ACCEPTEX,
guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes;
HANDLE CompletionPort,
WaitEvents,
hrc;
int endpointcount=0,
waitcount=0,
interval,
rc,
i;
struct addrinfo *res=NULL,
*ptr=NULL;
// Validate the command line
ValidateArgs(argc, argv);
// Load Winsock
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
{
fprintf(stderr, "unable to load Winsock!\n");
return -1;
}
InitializeCriticalSection(&gSocketListCs);
InitializeCriticalSection(&gBufferListCs);
InitializeCriticalSection(&gPendingCritSec);
// Create the completion port used by this server
CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
if (CompletionPort == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// Find out how many processors are on this system
GetSystemInfo(&sysinfo);
if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT)
{
sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT;
}
// Round the buffer size to the next increment of the page size
if ((gBufferSize % sysinfo.dwPageSize) != 0)
{
gBufferSize = ((gBufferSize / sysinfo.dwPageSize) + 1) * sysinfo.dwPageSize;
}
printf("Buffer size = %lu (page size = %lu)\n",
gBufferSize, sysinfo.dwPageSize);
// Create the worker threads to service the completion notifications
for(waitcount=0; waitcount < (int)sysinfo.dwNumberOfProcessors ;waitcount++)
{
WaitEvents = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL);
if (WaitEvents == NULL)
{
fprintf(stderr, "CreatThread failed: %d\n", GetLastError());
return -1;
}
}
printf("Local address: %s; Port: %s; Family: %d\n",
gBindAddr, gBindPort, gAddressFamily);
// Obtain the "wildcard" addresses for all the available address families
res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol);
if (res == NULL)
{
fprintf(stderr, "ResolveAddress failed to return any addresses!\n");
return -1;
}
// For each local address returned, create a listening/receiving socket
ptr = res;
while (ptr)
{
printf("Listening address: ");
PrintAddress(ptr->ai_addr, ptr->ai_addrlen);
printf("\n");
listenobj = (LISTEN_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(LISTEN_OBJ));
if (listenobj == NULL)
{
fprintf(stderr, "Out of memory!\n");
return -1;
}
listenobj->LoWaterMark = gInitialAccepts;
InitializeCriticalSection(&listenobj->ListenCritSec);
// Save off the address family of this socket
listenobj->AddressFamily = ptr->ai_family;
// create the socket
listenobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (listenobj->s == INVALID_SOCKET)
{
fprintf(stderr, "socket failed: %d\n", WSAGetLastError());
return -1;
}
// Create an event to register for FD_ACCEPT events on
listenobj->AcceptEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (listenobj->AcceptEvent == NULL)
{
fprintf(stderr, "CreateEvent failed: %d\n", GetLastError());
return -1;
}
listenobj->RepostAccept = CreateEvent(NULL, TRUE, FALSE, NULL);
if (listenobj->RepostAccept == NULL)
{
fprintf(stderr, "CreateSemaphore failed: %d\n", GetLastError());
return -1;
}
// Add the event to the liste of waiting events
WaitEvents = listenobj->AcceptEvent;
WaitEvents = listenobj->RepostAccept;
// Associate the socket and its SOCKET_OBJ to the completion port
hrc = CreateIoCompletionPort((HANDLE)listenobj->s, CompletionPort, (ULONG_PTR)listenobj, 0);
if (hrc == NULL)
{
fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError());
return -1;
}
// bind the socket to a local address and port
rc = bind(listenobj->s, ptr->ai_addr, ptr->ai_addrlen);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "bind failed: %d\n", WSAGetLastError());
return -1;
}
// Need to load the Winsock extension functions from each provider
// -- e.g. AF_INET and AF_INET6.
rc = WSAIoctl(
listenobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&listenobj->lpfnAcceptEx,
sizeof(listenobj->lpfnAcceptEx),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n",
WSAGetLastError());
return -1;
}
// Load the Winsock extensions from each provider
rc = WSAIoctl(
listenobj->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&listenobj->lpfnGetAcceptExSockaddrs,
sizeof(listenobj->lpfnGetAcceptExSockaddrs),
&bytes,
NULL,
NULL
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n",
WSAGetLastError());
return -1;
}
// Put the socket into listening mode
rc = listen(listenobj->s, 200);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "listen failed: %d\n", WSAGetLastError());
return -1;
}
// Register for FD_ACCEPT notification on listening socket
rc = WSAEventSelect(listenobj->s, listenobj->AcceptEvent, FD_ACCEPT);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEventSelect failed: %d\n", WSAGetLastError());
return -1;
}
// Initiate the initial accepts for each listen socket
for(i=0; i < gInitialAccepts ;i++)
{
acceptobj = GetBufferObj(gBufferSize);
if (acceptobj == NULL)
{
fprintf(stderr, "Out of memory!\n");
return -1;
}
acceptobj->PostAccept = listenobj->AcceptEvent;
InsertPendingAccept(listenobj, acceptobj);
PostAccept(listenobj, acceptobj);
}
//
// Maintain a list of the listening socket structures
//
if (ListenSockets == NULL)
{
ListenSockets = listenobj;
}
else
{
listenobj->next = ListenSockets;
ListenSockets = listenobj;
}
endpointcount++;
ptr = ptr->ai_next;
}
// free the addrinfo structure for the 'bind' address
freeaddrinfo(res);
gStartTime = gStartTimeLast = GetTickCount();
interval = 0;
while (1)
{
rc = WSAWaitForMultipleEvents(
waitcount,
WaitEvents,
FALSE,
5000,
FALSE
);
if (rc == WAIT_FAILED)
{
fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
break;
}
else if (rc == WAIT_TIMEOUT)
{
interval++;
PrintStatistics();
if (interval == 36)
{
int optval,
optlen;
// For TCP, cycle through all the outstanding AcceptEx operations
// to see if any of the client sockets have been connected but
// haven't received any data. If so, close them as they could be
// a denial of service attack.
listenobj = ListenSockets;
while (listenobj)
{
EnterCriticalSection(&listenobj->ListenCritSec);
acceptobj = listenobj->PendingAccepts;
while (acceptobj)
{
optlen = sizeof(optval);
rc = getsockopt(
acceptobj->sclient,
SOL_SOCKET,
SO_CONNECT_TIME,
(char *)&optval,
&optlen
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n",
WSAGetLastError());
}
else
{
// If the socket has been connected for more than 5 minutes,
// close it. If closed, the AcceptEx call will fail in the
// completion thread.
if ((optval != 0xFFFFFFFF) && (optval > 300))
{
printf("closing stale handle\n");
closesocket(acceptobj->sclient);
acceptobj->sclient = INVALID_SOCKET;
}
}
acceptobj = acceptobj->next;
}
LeaveCriticalSection(&listenobj->ListenCritSec);
listenobj = listenobj->next;
}
interval = 0;
}
}
else
{
int index;
index = rc - WAIT_OBJECT_0;
for( ; index < waitcount ; index++)
{
rc = WaitForSingleObject(WaitEvents, 0);
if (rc == WAIT_FAILED || rc == WAIT_TIMEOUT)
{
continue;
}
if (index < (int)sysinfo.dwNumberOfProcessors)
{
// One of the completion threads exited
// This is bad so just bail - a real server would want
// to gracefully exit but this is just a sample ...
ExitProcess(-1);
}
else
{
// An FD_ACCEPT event occured
listenobj = ListenSockets;
while (listenobj)
{
if ((listenobj->AcceptEvent == WaitEvents) ||
(listenobj->RepostAccept== WaitEvents))
break;
listenobj = listenobj->next;
}
if (listenobj)
{
WSANETWORKEVENTS ne;
int limit=0;
if (listenobj->AcceptEvent == WaitEvents)
{
// EnumNetworkEvents to see if FD_ACCEPT was set
rc = WSAEnumNetworkEvents(
listenobj->s,
listenobj->AcceptEvent,
&ne
);
if (rc == SOCKET_ERROR)
{
fprintf(stderr, "WSAEnumNetworkEvents failed: %d\n",
WSAGetLastError());
}
if ((ne.lNetworkEvents & FD_ACCEPT) == FD_ACCEPT)
{
// We got an FD_ACCEPT so post multiple accepts to
// cover the burst
limit = BURST_ACCEPT_COUNT;
}
}
else if (listenobj->RepostAccept == WaitEvents)
{
// Semaphore is signaled
limit = InterlockedExchange(&listenobj->RepostCount, 0);
ResetEvent(listenobj->RepostAccept);
}
i = 0;
while ( (i++ < limit) &&
(listenobj->PendingAcceptCount < gMaxAccepts) )
{
acceptobj = GetBufferObj(gBufferSize);
if (acceptobj)
{
acceptobj->PostAccept = listenobj->AcceptEvent;
InsertPendingAccept(listenobj, acceptobj);
PostAccept(listenobj, acceptobj);
}
}
}
}
}
}
}
WSACleanup();
return 0;
}
────────────────────────────
以上程序来自Windows网络编程2nd edition的源码,从中可以清晰的看到完成端口运转的整体框架。为了简单起见,省略了部分内存管理的代码,因为和理解IOCP的运行没什么关系。 ACE的proactor模式在Windows的实现就是IOCP.
"ACE_WIN32_Proactor uses an I/O completion port for completion event detection."
in《ACE Programmer's Guide, The: Practical Design Patterns for Network and Systems Programming》8.4节
页:
[1]