2025-04-27 07:49:33 -04:00

1877 lines
48 KiB
C++
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**********************************************************************/
/** Microsoft Windows NT - Windows 95 **/
/** Copyright(c) Microsoft Corp., 1994-1996 **/
/**********************************************************************/
/*
atqcport.cxx
This module contains replacement code for completion port and TransmitFile APIs
for Windows95 , where this functionality is absent.
Code is tightly coupled with ATQ code, reusing some of the data structures and globals.
Main ATQ processing code is left intact as much as possible , so that when we get real
completion ports, amount of modications should be minimal
One port per process currently supported.
FILE HISTORY:
VladS 05-Jan-1996 Created.
Functions Exported:
HANDLE SIOCreateCompletionPort();
DWORD SIODestroyCompletionPort();
DWORD SIOStartAsyncOperation();
BOOL SIOGetQueuedCompletionStatus();
DWORD SIOPostCompletionStatus();
*/
#include "isatq.hxx"
#include "atqcport.hxx"
#include <inetsvcs.h>
/************************************************************
* Private Globals
************************************************************/
W95CPORT *g_pCPort = NULL;
//
// Queues with ATQ contexts, being processed in completion port
//
LIST_ENTRY g_SIOIncomingRequests ; // Queue of incoming socket i/o
LIST_ENTRY g_SIOCompletedIoRequests ; // Queue with results of i/o
CRITICAL_SECTION g_SIOGlobalCriticalSection; // global sync variable.
HANDLE g_hPendingCompletionSemaphore = NULL;
DWORD SIOPoolThread( LPDWORD param );
BOOL
SIO_Private_StartAsyncOperation(
IN HANDLE hExistingPort,
IN PATQ_CONT pAtqContext
);
PATQ_CONT
SIO_Private_GetQueuedContext(
LIST_ENTRY *pQueue
);
BOOL
SIOCheckContext(
IN PATQ_CONTEXT pAtqC,
IN BOOL fNew /* = TRUE */
);
PATQ_CONT
FindATQContextFromSocket(
SOCKET sc
);
//
// Global synzronization calls
//
#define SIOLockGlobals() EnterCriticalSection( &g_SIOGlobalCriticalSection )
#define SIOUnlockGlobals() LeaveCriticalSection( &g_SIOGlobalCriticalSection )
/************************************************************
* Public functions.
************************************************************/
HANDLE
SIOCreateCompletionPort(
IN HANDLE hAsyncIO,
IN HANDLE hExistingPort,
IN DWORD dwCompletionKey,
IN DWORD dwConcurrentThreads
)
/*++
Routine Description:
Initializes the ATQ completion port
Arguments:
Return Value:
valid handle if successful, NULL on error (call GetLastError)
--*/
{
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,"SIOCreateCompletionPort entered\n"));
}
if (INVALID_HANDLE_VALUE != hAsyncIO) {
//
// Set up i/o handle to non-blocking mode
//
DWORD one = 1;
ioctlsocket( (SOCKET)hAsyncIO, FIONBIO, &one );
}
//
// If passed handle is not null - only initialize our parts of ATQ context
// as completion port object has been created before
//
if ( (hExistingPort != NULL) && (g_pCPort != NULL) ) {
PATQ_CONT pAtqContext = (PATQ_CONT)dwCompletionKey;
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIO_CreatePort(%lu)] Received context=%x socket=%x\n",
GetCurrentThreadId(),pAtqContext,hAsyncIO));
}
//
// Set up SIO specific fields in ATQ context
//
pAtqContext->SIOListEntry.Flink =
pAtqContext->SIOListEntry.Blink = NULL;
pAtqContext->dwSIOFlags = 0;
return (HANDLE)g_pCPort;
}
//
// First time initialization
// BUGBUG Code assumes that there is only one completion port in use, as
// incoming queue is initialized here . If we will support more than one
// completion port object, queue initialization should be moved into ATQInitialize
//
g_pCPort = new W95CPORT(dwConcurrentThreads);
if ( (g_pCPort == NULL) ||
(g_pCPort->QueryWakeupSocket() == INVALID_SOCKET) ) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIO]Could not create completion port"
));
delete g_pCPort;
ATQ_ASSERT(FALSE);
return NULL;
}
InitializeListHead( &g_SIOIncomingRequests );
InitializeListHead( &g_SIOCompletedIoRequests );
//
// Prepare global syncronization mechanism
//
InitializeCriticalSection( &g_SIOGlobalCriticalSection );
g_hPendingCompletionSemaphore =
CreateSemaphore(NULL,// Security Attributes
0, // Initial count
0x7fffffff, // Maximum count
NULL); // Name
return (HANDLE)g_pCPort;
} // SIOCreateCompletionPort
BOOL
SIODestroyCompletionPort(
IN HANDLE hExistingPort
)
/*++
Routine Description:
Destroys ATQ completion port
Arguments:
Return Value:
TRUE, if successful,
FALSE, otherwise
--*/
{
PATQ_CONT pAtqContext = NULL;
W95CPORT *pCPort = (W95CPORT *)hExistingPort;
//
// Queue completion indications to SIO thread and file I/O threads
//
if ( pCPort == NULL ) {
//
// Port already destroyed - return
//
return TRUE;
}
g_pCPort->Shutdown();
delete g_pCPort;
g_pCPort = NULL;
return TRUE;
} // SIODestroyCompletionPort
BOOL
SIOStartAsyncOperation(
IN HANDLE hExistingPort,
IN PATQ_CONTEXT pAtqContext
)
/*++
Routine Description:
Queues ATQ context with requested i/o operation to the completion port.
Values in context should be set by a caller , coompletion will be available
by calling SIOGetQueuedCompletionStatus.
Arguments:
Return Value:
TRUE if successful, FALSE on error (call GetLastError)
--*/
{
PATQ_CONT pAtqFullContext = (PATQ_CONT)pAtqContext;
W95CPORT *pCPort = (W95CPORT *)hExistingPort;
DWORD dwErr;
// Parameter validation
if (!hExistingPort || !pAtqContext) {
ATQ_ASSERT(FALSE);
SetLastError(ERROR_INVALID_PARAMETER);
InterlockedDecrement( &pAtqFullContext->m_nIO);
return FALSE;
}
//
// Add this context to the incoming queue
//
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[StartAsyncOperation(%lu)] AtqCtxt=%x \n ",
GetCurrentThreadId(),
pAtqContext
));
}
//
// Is this valid new context for completion port.
//
if(!SIOCheckContext(pAtqContext,TRUE)) {
dwErr = ERROR_OPERATION_ABORTED;
goto error_exit;
}
//
// Check SIO thread status, if there are no thread , create one
//
if (!pCPort->SIOCheckCompletionThreadStatus()) {
dwErr = ERROR_NOT_ENOUGH_MEMORY;
goto error_exit;
}
//
// Validate file handle
//
if (!pAtqFullContext->hAsyncIO) {
dwErr = WSAENOTSOCK;
goto error_exit;
}
SIOLockGlobals();
pAtqFullContext->dwSIOFlags &= ~ATQ_SIO_FLAG_STATE_MASK;
pAtqFullContext->dwSIOFlags |= ATQ_SIO_FLAG_STATE_INCOMING;
InsertTailList( &g_SIOIncomingRequests, &pAtqFullContext->SIOListEntry );
SIOUnlockGlobals();
//
// Signal processing thread about scheduling new i/o operation
//
pCPort->Wakeup();
return TRUE;
error_exit:
ATQ_PRINTF((DBG_CONTEXT,"Error %d in SIOStartAsyncOperation\n", dwErr));
//
// Then no need to select, we can just fail this I/O
//
pAtqFullContext->arInfo.dwLastIOError = dwErr;
//
// Immediately queue context as completed
//
SIOPostCompletionStatus(hExistingPort,
0, //Total dwBytesTransferred from arInfo
(DWORD)pAtqFullContext,
pAtqFullContext->arInfo.lpOverlapped);
return TRUE;
} // SIOStartAsyncOperation
BOOL
SIOGetQueuedCompletionStatus(
IN HANDLE hExistingPort,
OUT LPDWORD lpdwBytesTransferred,
OUT LPDWORD lpdwCompletionKey,
OUT LPOVERLAPPED *lplpOverlapped,
IN DWORD msThreadTimeout
)
/*++
Routine Description:
Get next available completion or blocks
Arguments:
Return Value:
TRUE if successful, FALSE on error (call GetLastError)
--*/
{
//
// Validate parameters
//
if (!lpdwBytesTransferred || !lpdwCompletionKey) {
ATQ_ASSERT(FALSE);
SetLastError(ERROR_INVALID_PARAMETER);
return FALSE;
}
PATQ_CONT pAtqContext = NULL;
DWORD dwErr = NOERROR;
BOOL fRes = FALSE;
//
// Wait on completed queue semaphore
//
dwErr = WaitForSingleObject(
g_hPendingCompletionSemaphore,
msThreadTimeout
);
if (dwErr == WAIT_OBJECT_0 ) {
pAtqContext = SIO_Private_GetQueuedContext(&g_SIOCompletedIoRequests);
//
// Operation completed - reset IO command in ATQ context
//
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"GetQueuedContext returns context = %x\n", pAtqContext));
}
} else {
//
// timed out
//
ATQ_PRINTF((DBG_CONTEXT,"SIOGetQueuedCompletionStatus timed out\n"));
}
if (pAtqContext != NULL) {
//
// Is this valid context inside completion port ?
//
if(!SIOCheckContext((PATQ_CONTEXT)pAtqContext,FALSE)) {
SetLastError(ERROR_OPERATION_ABORTED);
return FALSE;
}
//
// Get atq context and overlapped buffer pointer from
// completion message
//
*lplpOverlapped = pAtqContext->arInfo.lpOverlapped;
*lpdwCompletionKey = (DWORD)pAtqContext;
*lpdwBytesTransferred = pAtqContext->arInfo.dwTotalBytesTransferred;
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"GetQueuedCompletion Returning %d bytes\n",
*lpdwBytesTransferred));
}
//
// Clear context fields
//
pAtqContext->arInfo.atqOp = AtqIoNone;
pAtqContext->dwSIOFlags &= ~ATQ_SIO_FLAG_STATE_MASK;
fRes = TRUE;
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOGetCompletion(%lu)] ATQContext=%x to socket=%x\n",
GetCurrentThreadId(),pAtqContext,pAtqContext->hAsyncIO));
}
// Johnson said that we should have following assert here....
DBG_ASSERT( pAtqContext->Signature == ATQ_CONTEXT_SIGNATURE);
//
// Real context - check if i/o completed correctly
//
if ( pAtqContext->arInfo.dwLastIOError != NOERROR) {
// Set last error again to prevent overwriting by other APIs
SetLastError(pAtqContext->arInfo.dwLastIOError);
fRes = FALSE;
}
} else {
*lpdwBytesTransferred = 0;
*lplpOverlapped = NULL;
*lpdwCompletionKey = 0;
}
return fRes;
} // SIOGetQueuedCompletionStatus
BOOL
SIOPostCompletionStatus(
IN HANDLE hExistingPort,
IN DWORD dwBytesTransferred,
IN DWORD dwCompletionKey,
IN LPOVERLAPPED lpOverlapped
)
/*++
Routine Description:
Posts passed information as ATQ context to the queue of completed requests
Arguments:
Return Value:
TRUE if successful, FALSE on error (call GetLastError)
--*/
{
PATQ_CONT pAtqContext = (PATQ_CONT)dwCompletionKey;
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOPostCompletionStatus(%lu)] AtqCtxt=%x Bytes=%d Overlapped=%x\n ",
GetCurrentThreadId(),
pAtqContext,
dwBytesTransferred,
lpOverlapped
));
}
if ( pAtqContext != NULL ) {
pAtqContext->arInfo.dwTotalBytesTransferred = dwBytesTransferred;
pAtqContext->arInfo.lpOverlapped = lpOverlapped;
if ( pAtqContext->dwSIOFlags & ATQ_SIO_FLAG_STATE_MASK) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOPostCompletionStatus(%lu)] Context is inside SIO. AtqCtxt=%x SIOFlags = %x \n ",
GetCurrentThreadId(),
pAtqContext,
pAtqContext->dwSIOFlags
));
ATQ_ASSERT(FALSE);
}
SIOLockGlobals();
pAtqContext->dwSIOFlags &= ~ATQ_SIO_FLAG_STATE_MASK;
pAtqContext->dwSIOFlags |= ATQ_SIO_FLAG_STATE_COMPLETED;
//
// This should never happen if context is in right state.
//
if (pAtqContext->SIOListEntry.Flink) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOPostCompletionStatus(%lu)] Context is in some other queue . AtqCtxt=%x SIOFlags = %x\n",
GetCurrentThreadId(),
pAtqContext,
pAtqContext->dwSIOFlags
));
ATQ_ASSERT(FALSE);
RemoveEntryList(&pAtqContext->SIOListEntry );
}
InsertTailList( &g_SIOCompletedIoRequests, &pAtqContext->SIOListEntry );
} else {
SIOLockGlobals();
}
//
// This context now counts as waiting for ATQ pool thread to pick it up
//
InterlockedIncrement( (PLONG)&g_AtqWaitingContextsCount );
//
// Wake up pool threads if they are waiting on the completion queue
//
ReleaseSemaphore(g_hPendingCompletionSemaphore, 1, NULL);
SIOUnlockGlobals();
//
// Call internal postqueue routine for main outcoming queue
//
return TRUE;
}
DWORD
SIOPoolThread(
LPDWORD param
)
/*++
Routine Description:
Thread routine for completion port thread
Arguments:
Return Value:
--*/
{
ATQ_ASSERT(param != NULL);
if (param == NULL) {
return 0;
}
//
// Cast passed parameter to pointer to completion port object and
// invoke appropriate method
//
W95CPORT *pCPort = (W95CPORT *)param;
return pCPort->PoolThreadCallBack();
} // SIOPoolTread
/*
Implementation of completion port class
*/
W95CPORT::W95CPORT(
IN DWORD dwConcurrentThreads
)
:
m_Signature (ATQ_SIO_CPORT_SIGNATURE),
m_IsDestroying (FALSE),
m_IsThreadRunning (0L),
m_hThread (INVALID_HANDLE_VALUE),
m_fWakeupSignalled (FALSE),
m_scWakeup (INVALID_SOCKET)
{
INT err;
SOCKADDR_IN sockAddr;
DWORD i = 0;
FD_ZERO(&m_ReadfdsStore);
FD_ZERO(&m_WritefdsStore);
do {
m_scWakeup = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if ( m_scWakeup == INVALID_SOCKET) {
if ( (WSAGetLastError() != WSAESOCKTNOSUPPORT)
|| ( i > 10 ) ) {
ATQ_PRINTF((DBG_CONTEXT,"socket creation failed with %d\n",
WSAGetLastError()));
goto exit;
}
ATQ_PRINTF((DBG_CONTEXT,"socket failed with %d retrying...\n",
WSAGetLastError()));
Sleep(1000);
i++;
}
} while ( m_scWakeup == INVALID_SOCKET );
ZeroMemory(&sockAddr, sizeof(sockAddr));
sockAddr.sin_family = AF_INET;
sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
sockAddr.sin_port = htons(CPORT_WAKEUP_PORT);
err = bind(m_scWakeup, (PSOCKADDR)&sockAddr, sizeof(sockAddr));
if ( err == SOCKET_ERROR ) {
ATQ_PRINTF((DBG_CONTEXT,"Error %d in bind\n", WSAGetLastError()));
closesocket(m_scWakeup);
m_scWakeup = INVALID_SOCKET;
}
exit:
//
// Set select timeout value
//
InitializeCriticalSection( &m_csLock );
} // W95CPORT::W95CPORT
W95CPORT::~W95CPORT(
VOID
)
{
ATQ_PRINTF((DBG_CONTEXT,"Win95 Cport %x being freed\n", this));
ATQ_ASSERT(m_IsDestroying);
DeleteCriticalSection( &m_csLock );
if ( m_scWakeup != INVALID_SOCKET ) {
closesocket(m_scWakeup);
m_scWakeup = INVALID_SOCKET;
}
} // W95CPORT::~W95CPORT()
BOOL
W95CPORT::Shutdown(VOID)
{
DWORD dwErr;
m_IsDestroying = TRUE;
Wakeup();
//
// Wait for thread to shut down BUGBUG
//
dwErr = WaitForSingleObject( m_hThread,100);
CloseHandle(m_hThread);
return TRUE;
} // W95CPORT::Shutdown(VOID)
VOID
W95CPORT::Wakeup(VOID)
{
SOCKADDR_IN sockAddr;
INT err;
DWORD dwBuf = ATQ_WAKEUP_SIGNATURE;
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[WakeUp(%lu)] Trying to signal wakeup socket=%x \n",
GetCurrentThreadId(), m_scWakeup));
}
Lock( );
if ( !m_fWakeupSignalled ) {
sockAddr.sin_family = AF_INET;
sockAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
sockAddr.sin_port = htons(CPORT_WAKEUP_PORT);
err = sendto(
m_scWakeup,
(PCHAR)&dwBuf,
sizeof(dwBuf),
0,
(PSOCKADDR)&sockAddr,
sizeof(sockAddr)
);
if ( err == SOCKET_ERROR ) {
ATQ_PRINTF((DBG_CONTEXT,
"Error %d in sendto\n",WSAGetLastError()));
} else {
m_fWakeupSignalled = TRUE;
}
}
Unlock( );
return;
} // W95CPORT::Wakeup
DWORD
W95CPORT::PoolThreadCallBack(
VOID
)
/*++
Routine Description:
This is routine, handling all events associated with socket i/o .
Arguments:
Return Value:
--*/
{
TIMEVAL tvTimeout;
FD_SET readfds;
FD_SET writefds;
DWORD dwErr = NOERROR;
//
// Initialize array of events for socket i/o
//
FD_ZERO(&m_ReadfdsStore);
FD_ZERO(&m_WritefdsStore);
//
// Set select timeout value
//
tvTimeout.tv_sec = (SIO_THREAD_TIMEOUT*3)/100; // 30 sec
tvTimeout.tv_usec = 0;
//
// Main loop
//
while (!m_IsDestroying) {
//
// Prepare socket arrays for select
//
PrepareDescriptorArrays( &readfds, &writefds);
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOThread(%lu)]Before select: Readcnt=%d WriteCnt=%d \n",
GetCurrentThreadId(), readfds.fd_count, writefds.fd_count
));
}
//
// Wait for first completed i/o or timeout
//
dwErr = select(0,&readfds,&writefds,NULL,&tvTimeout);
//
// If we are shutting down - stop processing and exit
//
if (m_IsDestroying) {
break;
}
//
// Figure out what triggered an event and act accordingly
//
if (SOCKET_ERROR == dwErr ) {
DWORD dwLastErr = WSAGetLastError();
//
// if we get WSANOTINITIALISED, then either something bad
// happened or we are being cleaned up
//
if ( dwLastErr == WSANOTINITIALISED ) {
ATQ_PRINTF((DBG_CONTEXT,
"SIO_Thread: select detects shutdown\n"));
break;
}
//
// Select failed - most probably because wakeup socket was closed
// after we checked for it but before calling select. If this is the case
// just continuing will be the right thing.
//
ATQ_PRINTF((DBG_CONTEXT,
"SIO_Thread: Select failed with %d\n", dwLastErr));
//
// Select failed , we need to go through list of sockets currently set up
// and remove those , which do not have associated ATQ contexts.
// That will happen if another thread removed context and closed socket
// when we were busy processing completion. In this case socket handle is
// already invalid when we came to select.
CleanupDescriptorList();
continue;
} else if (0L == dwErr) {
//
// Select timed out - BUGBUG close after number of timeouts
// if there is no i/o activity scheduled
//
//ATQ_PRINTF((DBG_CONTEXT,"select timed out\n"));
CleanupDescriptorList();
} else {
//
// This is socket completion. Walk all out sets and for each signalled socket
// process it's completion
//
ProcessSocketCompletion(dwErr, &readfds, &writefds);
}
}
ATQ_PRINTF((DBG_CONTEXT,"Exiting SIOPoolThread\n"));
return 0;
} // SIOPoolTread
BOOL
W95CPORT::PrepareDescriptorArrays(
IN PFD_SET ReadFds,
IN PFD_SET WriteFds
)
/*++
Routine Description:
Prepares sockets array for select call.
Nb: Object is not locked when arrays are processed, so if any other method will
be modifying arrays directly , locking should become more agressive. Global
critical section is taken when checking incoming queue.
Arguments:
Return Value:
--*/
{
PATQ_CONT pAtqContext = NULL;
FD_ZERO(ReadFds);
FD_ZERO(WriteFds);
//
// We should remove as many as possible from incoming
// queue, because if semaphore signalled but there were not
// free event handles available, we will leave incoming request
// in the queue.
//
BOOL fReadRequest;
PLIST_ENTRY listEntry;
PLIST_ENTRY nextEntry;
SIOLockGlobals();
for ( listEntry = g_SIOIncomingRequests.Flink;
listEntry != &g_SIOIncomingRequests;
listEntry = nextEntry ) {
nextEntry = listEntry->Flink;
pAtqContext = CONTAINING_RECORD(
listEntry,
ATQ_CONTEXT,
SIOListEntry );
//
// Is this valid new context for completion port. if not skip it
//
if(!SIOCheckContext((PATQ_CONTEXT)pAtqContext,FALSE)) {
continue;
}
fReadRequest = (pAtqContext->arInfo.atqOp == AtqIoRead) ;
if ( ( fReadRequest && (m_ReadfdsStore.fd_count >= (FD_SETSIZE-1) )) ||
(!fReadRequest && (m_WritefdsStore.fd_count >= FD_SETSIZE))) {
//
// Skip this context as appropriate array is full
//
ATQ_PRINTF((DBG_CONTEXT,"Full array!\n"));
continue;
}
//
// Add socket to appropriate queue
//
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOPrepareArray(%lu)] Received i/o request context=%x to socket=%x I/O OpCode=%d .\n",
GetCurrentThreadId(),
pAtqContext,
pAtqContext->hAsyncIO,
pAtqContext->arInfo.atqOp));
}
if((pAtqContext->dwSIOFlags & ATQ_SIO_FLAG_STATE_MASK) != ATQ_SIO_FLAG_STATE_INCOMING) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIO: PDA (%lu)] Not in incoming state AtqCtxt=%x SIOFlags = %x \n ",
GetCurrentThreadId(),
pAtqContext,
pAtqContext->dwSIOFlags
));
ATQ_ASSERT(FALSE);
}
//
// Remove context from incoming queue
//
RemoveEntryList(listEntry);
listEntry->Flink = listEntry->Blink = NULL;
pAtqContext->dwSIOFlags &= ~ATQ_SIO_FLAG_STATE_MASK;
pAtqContext->dwSIOFlags |= ATQ_SIO_FLAG_STATE_WAITING;
//
// Indicate socket as waiting for the ready state
//
switch (pAtqContext->arInfo.atqOp) {
case AtqIoRead:
FD_SET((SOCKET)pAtqContext->hAsyncIO,&m_ReadfdsStore);
break;
case AtqIoWrite:
case AtqIoXmitFile:
FD_SET((SOCKET)pAtqContext->hAsyncIO,&m_WritefdsStore);
break;
default:
ATQ_PRINTF((DBG_CONTEXT,
"[PrepareDescriptors(%lu)] Context=%x has invalid type of IO op\n",
GetCurrentThreadId(),pAtqContext));
break;
}
}
SIOUnlockGlobals();
//
// Initialize socket set for use with select .
// BUGBUG Is this memory operation platfrom independent ?
CopyMemory( ReadFds, &m_ReadfdsStore, sizeof(fd_set));
CopyMemory( WriteFds, &m_WritefdsStore, sizeof(fd_set));
//
// Add wakeup socket to read list
//
FD_SET(m_scWakeup,ReadFds);
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,"[SIOPrepareArray Added Wakeup socket=%x ReadCount=%d WriteCOunt=%d \n",
m_scWakeup,ReadFds->fd_count,WriteFds->fd_count));
}
return TRUE;
} // W95CPORT::PrepareDescriptorArrays
BOOL
W95CPORT::ProcessSocketCompletion(
IN DWORD dwCompletedCount,
IN PFD_SET ReadFds,
IN PFD_SET WriteFds
)
/*++
Routine Description:
Handling i/o completion for sockets in select descriptor array
Arguments:
Return Value:
--*/
{
UINT Count;
UINT i = 0;
INT dwErr;
DWORD cbBytesTransferred;
DWORD nBytes;
PFD_SET pCurrentSet = ReadFds;
PFD_SET pCurrentStoreSet = &m_ReadfdsStore;
ATQ_ASSERT(dwCompletedCount == (ReadFds->fd_count + WriteFds->fd_count));
//
// First process read set,then write set
//
for (i=0;i<2;i++) {
//
// Walk through completed sockets in current descriptor set
//
for (Count=0;Count < pCurrentSet->fd_count;Count++) {
PATQ_CONT pAtqContext = NULL;
//
// If this is wakeup socket - skip iteration
//
if ((pCurrentSet == ReadFds) &&
(m_scWakeup == pCurrentSet->fd_array[Count]) ) {
DWORD dwBuf;
INT err;
Lock( );
ATQ_ASSERT(m_fWakeupSignalled);
err = recvfrom(
m_scWakeup,
(PCHAR)&dwBuf,
sizeof(dwBuf),
0,
NULL,
NULL
);
if ( err == SOCKET_ERROR ) {
ATQ_PRINTF((DBG_CONTEXT,
"Error %d in recvfrom\n", WSAGetLastError()));
} else {
ATQ_ASSERT(dwBuf == ATQ_WAKEUP_SIGNATURE);
}
m_fWakeupSignalled = FALSE;
Unlock( );
continue;
}
//
// Find ATQ context associated with signalled socket
//
pAtqContext = FindATQContextFromSocket(
pCurrentSet->fd_array[Count]
);
ATQ_ASSERT(pAtqContext != NULL);
if (pAtqContext == NULL) {
//
// Did not locate context - go to next one, most probably
// we are shutting down. In any case remove this socket from
// stored list, as there is no use in it
//
FD_CLR(pCurrentSet->fd_array[Count],pCurrentStoreSet);
//
// Socket should be really closed do we need to close
// it again ?
closesocket(pCurrentSet->fd_array[Count]);
continue;
}
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"[ProcessCompletion(%lu)] Completion signal AtqCtxt=%x Socket=%x I/O OpCode=%d SioFlags=%x \n ",
GetCurrentThreadId(),
pAtqContext,
pAtqContext->hAsyncIO,
pAtqContext->arInfo.atqOp,
pAtqContext->dwSIOFlags
));
}
//
// Is context in right state
//
if( (pAtqContext->dwSIOFlags & ATQ_SIO_FLAG_STATE_MASK) != ATQ_SIO_FLAG_STATE_WAITING ) {
dwErr = SOCKET_ERROR;
WSASetLastError(WSAECONNABORTED);
pAtqContext->Print();
ATQ_PRINTF((DBG_CONTEXT,
"[ProcessCompletion(%x)] SIO state Ctxt=%x Flags=%x hAsync=%x hJra=%x\n ",
GetCurrentThreadId(),
pAtqContext,
pAtqContext->dwSIOFlags,
pAtqContext->hAsyncIO,
pAtqContext->hJraAsyncIO
));
} else {
//
// Perform selected i/o operation on ready socket
//
nBytes = 0;
switch(pAtqContext->arInfo.atqOp ) {
case AtqIoRead: {
DWORD dwFlags = 0;
LPWSABUF pBuf =
pAtqContext->arInfo.uop.opReadWrite.pBufAll;
dwErr = WSARecv(
pCurrentSet->fd_array[Count],
pBuf,
pAtqContext->arInfo.uop.opReadWrite.dwBufferCount,
&nBytes,
&dwFlags,
NULL, // no lpo
NULL
);
if ( pBuf != &pAtqContext->arInfo.uop.opReadWrite.buf1 ) {
LocalFree(pBuf);
}
pAtqContext->arInfo.uop.opReadWrite.pBufAll = NULL;
}
break;
case AtqIoWrite: {
LPWSABUF pBuf =
pAtqContext->arInfo.uop.opReadWrite.pBufAll;
dwErr = WSASend(
pCurrentSet->fd_array[Count],
pBuf,
pAtqContext->arInfo.uop.opReadWrite.dwBufferCount,
&nBytes,
0, // no flags
NULL, // no lpo
NULL
);
if ( pBuf != &pAtqContext->arInfo.uop.opReadWrite.buf1 ) {
LocalFree(pBuf);
}
pAtqContext->arInfo.uop.opReadWrite.pBufAll = NULL;
}
break;
case AtqIoXmitFile: {
DWORD cbWritten = 0;
WSABUF wsaBuf = {
pAtqContext->arInfo.uop.opFakeXmit.cbBuffer,
(PCHAR)pAtqContext->arInfo.uop.opFakeXmit.pvLastSent
};
dwErr = WSASend(
pCurrentSet->fd_array[Count],
&wsaBuf,
1,
&nBytes,
0, // no flags
NULL, // no lpo
NULL
);
}
break;
default:
dwErr = SOCKET_ERROR;
WSASetLastError(WSAECONNABORTED);
ATQ_PRINTF((DBG_CONTEXT,
"[ProcessCompletion(%lu)] Context=%x has invalid type of IO op[%x]\n",
GetCurrentThreadId(),pAtqContext,
pAtqContext->arInfo.atqOp));
DBG_ASSERT(FALSE);
break;
}
}
//
// Set the last error code in context
//
if (SOCKET_ERROR == dwErr) {
ATQ_PRINTF((DBG_CONTEXT,
"Error %d in socket operation[%d]\n",
WSAGetLastError(),
pAtqContext->arInfo.atqOp
));
pAtqContext->arInfo.dwLastIOError = WSAGetLastError();
//
// If this operation would be blocked - retry it again
//
if ( WSAEWOULDBLOCK == pAtqContext->arInfo.dwLastIOError) {
continue;
}
cbBytesTransferred = 0;
} else {
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"SIO operation[%d] returns %d bytes\n",
pAtqContext->arInfo.atqOp,
nBytes));
}
pAtqContext->arInfo.dwLastIOError = NOERROR;
cbBytesTransferred = nBytes;
}
//
// Set parameters in ATQ context I/O request block
//
pAtqContext->arInfo.dwTotalBytesTransferred = cbBytesTransferred;
//
// Clean up SIO state
//
pAtqContext->dwSIOFlags &= ~ATQ_SIO_FLAG_STATE_MASK;
//
// Queue this context to completed queue
//
SIOPostCompletionStatus((HANDLE)this,
cbBytesTransferred,
(DWORD)pAtqContext,
pAtqContext->arInfo.lpOverlapped
);
//
// Remove socket from the set where it is now
//
FD_CLR(pCurrentSet->fd_array[Count],pCurrentStoreSet);
}
pCurrentSet = WriteFds;
pCurrentStoreSet = &m_WritefdsStore;
}
return TRUE;
} // ProcessSocketCompletion
VOID
W95CPORT::CleanupDescriptorList(
VOID
)
/*++
Routine Description:
Prepares sockets array for select call.
Arguments:
Return Value:
--*/
{
UINT i = 0;
UINT Count;
int sErr;
int iSocketType ;
int iOptLen = sizeof(iSocketType);
PFD_SET pCurrentStoreSet = &m_ReadfdsStore;
FD_SET fdsMarked;
//
// First process read set,then write set
//
for (i=0;i<2;i++) {
FD_ZERO(&fdsMarked);
//
// Walk scheduled sockets in current descriptor set
//
for (Count=0;Count < pCurrentStoreSet->fd_count;Count++) {
PATQ_CONT pAtqContext = NULL;
//
// If this is wakeup socket - skip iteration
//
if ((m_scWakeup == pCurrentStoreSet->fd_array[Count]) &&
(pCurrentStoreSet == &m_ReadfdsStore) ) {
continue;
}
//
// Find ATQ context associated with signalled socket
//
pAtqContext = FindATQContextFromSocket(
pCurrentStoreSet->fd_array[Count]
);
if ( pAtqContext == NULL ) {
//
// Did not locate context - remove socket from the set where it is now
// Socket should be really closed do we need to close it again ?
//
ATQ_PRINTF((DBG_CONTEXT,
"[CleanupDLists(%lu)] Context not found for socket=%x.Cleaning record\n",
GetCurrentThreadId(),pCurrentStoreSet->fd_array[Count]));
//FD_CLR(pCurrentStoreSet->fd_array[Count],pCurrentStoreSet);
FD_SET((SOCKET)pCurrentStoreSet->fd_array[Count],&fdsMarked);
closesocket(pCurrentStoreSet->fd_array[Count]);
} else {
//
// Found ATQ context, validate if socket handle is still valid.
// Reason for this check is that it is possible that socket will be closed by one
// thread , while "select" pump thread is not waiting on select. It would be better
// to communicate this event by using thread message , for now we just use
// more brute force approach. If socked handle is not valid after select failed
// with NOSOCKET error code, we signal i/o completion failure for this context and
// remove socket from waiting list
//
sErr = getsockopt((SOCKET)pCurrentStoreSet->fd_array[Count],
SOL_SOCKET,
SO_TYPE,
(char *)&iSocketType,&iOptLen);
if (sErr != NO_ERROR) {
ATQ_PRINTF((DBG_CONTEXT,"Error %d in getsockopt\n",
WSAGetLastError()));
//
// Found invalid socket in store list, signal it's context with failure
// error code
// BUGBUG This code is identical to code used in completion processing
// routine, should be in separate method
//
// Set parameters in ATQ context I/O request block
//
pAtqContext->arInfo.dwTotalBytesTransferred = 0;
pAtqContext->arInfo.dwLastIOError = ERROR_OPERATION_ABORTED;
//
// Clean up SIO state
//
pAtqContext->dwSIOFlags &= ~ATQ_SIO_FLAG_STATE_MASK;
//
// Queue this context to completed queue
//
SIOPostCompletionStatus((HANDLE)this,
0,
(DWORD)pAtqContext,
pAtqContext->arInfo.lpOverlapped
);
//
// Remove socket from the set where it is now
//
FD_SET((SOCKET)pCurrentStoreSet->fd_array[Count],&fdsMarked);
}
}
}
//
// Now remove all sockets marked from current set
//
for (Count=0;Count < fdsMarked.fd_count;Count++) {
FD_CLR(fdsMarked.fd_array[Count],pCurrentStoreSet);
}
pCurrentStoreSet = &m_WritefdsStore;
}
} // W95CPORT::CleanupDescriptorList
BOOL
W95CPORT::SIOCheckCompletionThreadStatus(
VOID
)
/*++
Routine Description:
This routine makes sure there is at least one thread in
the thread pool.
Arguments:
Return Value:
TRUE if successful, FALSE on error (call GetLastError)
--*/
{
BOOL fRet = TRUE;
//
// If no threads are available, kick a new one off up to the limit
//
if (!InterlockedExchange(&m_IsThreadRunning,1)) {
DWORD dwId;
ATQ_PRINTF((DBG_CONTEXT,"Starting a new SIO Pool Thread\n"));
m_hThread = CreateThread(
NULL,
0,
(LPTHREAD_START_ROUTINE)SIOPoolThread,
this,
0,
&dwId
);
if ( !m_hThread ) {
fRet = FALSE;
}
}
return fRet;
} // SIOCheckCompletionThreadStatus()
BOOL
SIOWSARecv(
IN PATQ_CONT pContext,
IN LPWSABUF pwsaBuffers,
IN DWORD dwBufferCount,
IN OVERLAPPED * lpo OPTIONAL
)
/*++
Routine Description:
This routine does WSARecv+fake completion port
Arguments:
pContext - pointer to ATQ context
lpBuffer - Buffer to put read data in
BytesToRead - number of bytes to read
lpo - Overlapped structure to use
Return Value:
TRUE on success and FALSE if there is a failure.
--*/
{
pContext->arInfo.atqOp = AtqIoRead;
pContext->arInfo.lpOverlapped = lpo;
pContext->arInfo.uop.opReadWrite.dwBufferCount = dwBufferCount;
if ( dwBufferCount == 1) {
pContext->arInfo.uop.opReadWrite.buf1.len = pwsaBuffers->len;
pContext->arInfo.uop.opReadWrite.buf1.buf = pwsaBuffers->buf;
pContext->arInfo.uop.opReadWrite.pBufAll =
&pContext->arInfo.uop.opReadWrite.buf1;
} else {
DBG_ASSERT( dwBufferCount > 1);
WSABUF * pBuf = (WSABUF *)
::LocalAlloc( LPTR, dwBufferCount * sizeof (WSABUF));
if ( pBuf != NULL) {
pContext->arInfo.uop.opReadWrite.pBufAll = pBuf;
CopyMemory( pBuf, pwsaBuffers,
dwBufferCount * sizeof(WSABUF));
} else {
ATQ_PRINTF((DBG_CONTEXT,
"Failed to allocate buffer[%d] for WSARecv\n",
dwBufferCount));
return ( FALSE);
}
}
return(SIOStartAsyncOperation(
g_hIoCompPort,
(PATQ_CONTEXT)pContext
));
} // SIOWSARecv
BOOL
SIOWSASend(
IN PATQ_CONT pContext,
IN LPWSABUF pwsaBuffers,
IN DWORD dwBufferCount,
IN OVERLAPPED * lpo OPTIONAL
)
/*++
Routine Description:
This routine does WSASend+fake completion port
Arguments:
pContext - pointer to ATQ context
lpBuffer - Buffer to put read data in
BytesToRead - number of bytes to read
lpo - Overlapped structure to use
Return Value:
TRUE on success and FALSE if there is a failure.
--*/
{
pContext->arInfo.atqOp = AtqIoWrite;
pContext->arInfo.lpOverlapped = lpo;
pContext->arInfo.uop.opReadWrite.dwBufferCount = dwBufferCount;
if ( dwBufferCount == 1) {
pContext->arInfo.uop.opReadWrite.buf1.len = pwsaBuffers->len;
pContext->arInfo.uop.opReadWrite.buf1.buf = pwsaBuffers->buf;
pContext->arInfo.uop.opReadWrite.pBufAll =
&pContext->arInfo.uop.opReadWrite.buf1;
} else {
DBG_ASSERT( dwBufferCount > 1);
WSABUF * pBuf = (WSABUF *)
::LocalAlloc( LPTR, dwBufferCount * sizeof (WSABUF));
if ( pBuf != NULL) {
pContext->arInfo.uop.opReadWrite.pBufAll = pBuf;
CopyMemory( pBuf, pwsaBuffers,
dwBufferCount * sizeof(WSABUF));
} else {
ATQ_PRINTF((DBG_CONTEXT,
"Failed to allocate buffer[%d] for WSARecv\n",
dwBufferCount));
return ( FALSE);
}
}
return(SIOStartAsyncOperation(
g_hIoCompPort,
(PATQ_CONTEXT)pContext
));
} // SIOWSASend
/************************************************************
* Private functions.
************************************************************/
PATQ_CONT
SIO_Private_GetQueuedContext(
IN PLIST_ENTRY pQueue
)
{
PATQ_CONT pAtqContext = NULL;
PLIST_ENTRY pEntry;
//
// We may have something in the queue
//
SIOLockGlobals();
//
// Remove first event from queue
//
if ( !IsListEmpty(pQueue)) {
pEntry = RemoveHeadList( pQueue );
pAtqContext = CONTAINING_RECORD(
pEntry,
ATQ_CONTEXT,
SIOListEntry
);
pEntry->Flink = pEntry->Blink = NULL;
}
SIOUnlockGlobals();
return pAtqContext;
} // SIO_Private_GetQueuedContext
BOOL
SIOCheckContext(
IN PATQ_CONTEXT pAtqC,
IN BOOL fNew /* = TRUE */
)
/*++
Routine Description:
Tries to validate that context can enter completion port.
If context is already in, signal error condition.
Arguments:
patqContext - pointer to ATQ context
Return Value:
TRUE if successful, FALSE on error (call GetLastError)
--*/
{
PATQ_CONT pAtqContext = (PATQ_CONT)pAtqC;
if (!pAtqC || !pAtqC->hAsyncIO) {
return TRUE;
}
if (fNew) {
//
// Verify this context is not in SIO yet
//
if (( pAtqContext->dwSIOFlags & ATQ_SIO_FLAG_STATE_MASK) ||
(pAtqContext->SIOListEntry.Flink)) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOCheckContext(%lu)] Context is already in SIO state = %x \n ",
GetCurrentThreadId(),
pAtqContext->dwSIOFlags
));
ATQ_ASSERT(FALSE);
return FALSE;
}
} else {
if ( (pAtqContext->dwSIOFlags & ATQ_SIO_FLAG_STATE_MASK) == 0 ) {
ATQ_PRINTF((DBG_CONTEXT,
"[SIOCheckContext(%lu)] Context is not in SIO state = %x while should be\n ",
GetCurrentThreadId(),
pAtqContext->dwSIOFlags
));
ATQ_ASSERT(FALSE);
return FALSE;
}
}
return TRUE;
} // SIOCheckContext
PATQ_CONT
FindATQContextFromSocket(
SOCKET sc
)
{
LIST_ENTRY * pentry;
PATQ_CONT pAtqContext;
DWORD FakeSocket = (DWORD)sc | 0x80000000;
//
// If socket is obviously invalid - fail
//
if (!sc) {
ATQ_PRINTF((DBG_CONTEXT,"Invalid socket[%d] on find\n",
sc));
return NULL;
}
ATQ_ASSERT(g_dwNumContextLists == ATQ_NUM_CONTEXT_LIST_W95);
AtqActiveContextList[0].Lock( );
for ( pentry = AtqActiveContextList[0].ActiveListHead.Flink;
pentry != &AtqActiveContextList[0].ActiveListHead;
pentry = pentry->Flink )
{
pAtqContext = CONTAINING_RECORD(
pentry,
ATQ_CONTEXT,
m_leTimeout );
if ( pAtqContext->Signature != ATQ_CONTEXT_SIGNATURE ) {
ATQ_PRINTF((DBG_CONTEXT,"Invalid signature[%x] on context %x\n",
pAtqContext->Signature, pAtqContext));
ATQ_ASSERT( pAtqContext->Signature == ATQ_CONTEXT_SIGNATURE );
break;
}
if ( ((HANDLE)sc == pAtqContext->hAsyncIO)
||
((pAtqContext->hJraAsyncIO == FakeSocket) &&
(pAtqContext->arInfo.atqOp != 0)) ) {
//
// We found our context
//
AtqActiveContextList[0].Unlock( );
if ( (HANDLE)sc != pAtqContext->hAsyncIO ) {
ATQ_PRINTF((DBG_CONTEXT,
"Processing leftover context %x[%x]\n",
pAtqContext, pAtqContext->arInfo.atqOp));
}
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"Found atq context %x[%x]\n", pAtqContext, sc));
}
return pAtqContext;
}
}
AtqActiveContextList[0].Unlock( );
IF_DEBUG(SIO) {
ATQ_PRINTF((DBG_CONTEXT,
"Cannot find atq context for socket %d\n", sc));
}
return NULL;
} // FindATQContextFromSocket