364 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			364 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*++
 | |
| 
 | |
|    Copyright    (c)    1994    Microsoft Corporation
 | |
| 
 | |
|    Module  Name :
 | |
| 
 | |
|         cretryq.cxx
 | |
| 
 | |
|    Abstract:
 | |
| 		Implements the generic retry queue
 | |
| 
 | |
|    Author:
 | |
| 
 | |
|            Rohan Phillips    ( Rohanp )    18-JAN-1996
 | |
| 
 | |
|    Project:
 | |
| 
 | |
|           SMTP Server DLL
 | |
| 
 | |
|    Functions Exported:
 | |
| 
 | |
| 
 | |
|    Revision History:
 | |
| 
 | |
| 
 | |
| --*/
 | |
| 
 | |
| /************************************************************
 | |
|  *     Include Headers
 | |
|  ************************************************************/
 | |
| 
 | |
| #include "cretryq.hxx"
 | |
| //#include "timemath.h"
 | |
| 
 | |
| // Calculate timeout in milliseconds from timeout in seconds.
 | |
| #define TimeToWait(Timeout)   (((Timeout) == INFINITE) ? INFINITE : \
 | |
|                                 (Timeout) * 1000)
 | |
| 
 | |
| CRetryQ::CRetryQ(void) {
 | |
| 	TraceFunctEnterEx((LPARAM)this, "CRetryQ::CRetryQ");
 | |
| 
 | |
| 	m_hTimeoutEvent = NULL;
 | |
| 	m_hThreadHandle = NULL;
 | |
| 	m_cEntries = 0;
 | |
| 	m_cTimeOut = INFINITE;
 | |
| 	m_fLongSleep = TRUE;
 | |
| 	m_pLastEntry = NULL;
 | |
| 
 | |
| 	//init list and crit sect
 | |
| 	InitializeListHead(&m_leQueueHead);
 | |
|    	InitializeCriticalSection( &m_csQueue);
 | |
| 
 | |
| 	TraceFunctLeaveEx((LPARAM)this);
 | |
| }
 | |
| 
 | |
| CRetryQ::~CRetryQ() {
 | |
| 	TraceFunctEnterEx((LPARAM)this, "CRetryQ::~CRetryQ");
 | |
| 
 | |
| 	DeleteCriticalSection (&m_csQueue);
 | |
| 
 | |
| 	TraceFunctLeaveEx((LPARAM)this);
 | |
| }
 | |
| 
 | |
| void CRetryQ::InsertIntoQueue(CRetryQueueEntry *pEntry) {
 | |
| 	TraceFunctEnterEx((LPARAM)this, "CRetryQ::InsertIntoQueue");
 | |
| 
 | |
|     _ASSERT(pEntry != NULL);
 | |
| 
 | |
| 	if (pEntry == NULL) {
 | |
| 		TraceFunctLeaveEx((LPARAM)this);
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| #if 0
 | |
| 	// BUGBUG
 | |
| 	if(QuerySmtpInstance()->IsShuttingDown())
 | |
| 	{
 | |
| 		delete MailQEntry;
 | |
| 		TraceFunctLeaveEx((LPARAM)this);
 | |
| 		return;
 | |
| 	}
 | |
| #endif
 | |
| 
 | |
| 	LockList();
 | |
| 
 | |
| 	// Insert into the list of entrys to retry/NDR.
 | |
| 	InsertTailList(&m_leQueueHead, pEntry->QueryListEntry());
 | |
| 
 | |
| 	// wake up the timeout thread if its been sleeping too long
 | |
|     if (m_fLongSleep) SetEvent(m_hTimeoutEvent);
 | |
| 
 | |
| 	m_cEntries++;
 | |
| 
 | |
| 	UnLockList();
 | |
| 
 | |
| 	TraceFunctLeaveEx((LPARAM)this);
 | |
| }
 | |
| 
 | |
| void CRetryQ::RemoveFromQueue(CRetryQueueEntry *pEntry) {
 | |
| 	TraceFunctEnterEx((LPARAM)this, "CRetryQ::RemoveFromQueue");
 | |
| 
 | |
|     _ASSERT(pEntry != NULL);
 | |
| 
 | |
| 	if(pEntry == NULL) {
 | |
| 		TraceFunctLeaveEx((LPARAM)this);
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	LockList();
 | |
| 
 | |
|     // Remove from list of connections
 | |
|     RemoveEntryList(pEntry->QueryListEntry());
 | |
| 
 | |
|     // Decrement count of entries
 | |
|     m_cEntries--;
 | |
| 
 | |
| 	UnLockList();
 | |
| 
 | |
| 	TraceFunctLeaveEx((LPARAM)this);
 | |
| }
 | |
| 
 | |
| void CRetryQ::FlushQueue(void) {
 | |
| 	PLIST_ENTRY pEntry;
 | |
| 	CRetryQueueEntry *pQEntry;
 | |
| 
 | |
| 	TraceFunctEnterEx((LPARAM) this, "void CRetryQ::FlushQueue(void)");
 | |
| 
 | |
|     // delete all entries from the list
 | |
|     while(!IsListEmpty(&m_leQueueHead)) {
 | |
| 		pEntry = RemoveHeadList(&m_leQueueHead);
 | |
| 		pQEntry = CONTAINING_RECORD(pEntry, CRetryQueueEntry, m_le);
 | |
| 		delete pQEntry;
 | |
| 		m_cEntries--;
 | |
|     }
 | |
| 
 | |
|     TraceFunctLeaveEx((LPARAM) this);
 | |
| }
 | |
| 
 | |
| void CRetryQ::CleanQueue(void *pvContext) {
 | |
| 	CRetryQueueEntry *pQEntry;
 | |
| 
 | |
| 	TraceFunctEnterEx((LPARAM) this, "void CRetryQ::CleanQueue(void *pvContext)");
 | |
| 
 | |
| 	LockList();
 | |
| 
 | |
|     PLIST_ENTRY pEntry = m_leQueueHead.Flink;
 | |
|     while(pEntry != &m_leQueueHead) {
 | |
| 		PLIST_ENTRY pNext = pEntry->Flink;
 | |
| 
 | |
| 		pQEntry = CONTAINING_RECORD(pEntry, CRetryQueueEntry, m_le);
 | |
| 		if (pQEntry->MatchesContext(pvContext)) {
 | |
| 			RemoveEntryList(pEntry);
 | |
| 			delete pQEntry;
 | |
| 			m_cEntries--;
 | |
| 		}
 | |
| 
 | |
| 		pEntry = pNext;
 | |
|     }
 | |
| 
 | |
| 	UnLockList();
 | |
| 
 | |
|     TraceFunctLeaveEx((LPARAM) this);
 | |
| }
 | |
| 
 | |
| BOOL CRetryQ::InitializeQueue(DWORD cTimeOut) {
 | |
| 	TraceFunctEnterEx((LPARAM)this, "CRetryQ::InitializeQueue");
 | |
| 
 | |
| 	m_cTimeOut = cTimeOut;
 | |
| 	m_fShutdown = FALSE;
 | |
| 
 | |
| 	// create the event the thread waits on
 | |
| 	m_hTimeoutEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
 | |
| 	if (!m_hTimeoutEvent) {
 | |
| 		ErrorTrace((LPARAM) NULL, "CreateEvent failed (err=%d)", GetLastError());
 | |
| 		TraceFunctLeaveEx((LPARAM)this);
 | |
| 		return(FALSE);
 | |
| 	}
 | |
| 
 | |
|   	// create the thread that processes things out of the
 | |
|   	// the queue
 | |
|   	DWORD ThreadId;
 | |
| 	m_hThreadHandle = CreateThread(NULL, 0, CRetryQ::RetryQueueThread, 
 | |
| 		this, 0, &ThreadId);
 | |
|   	if (m_hThreadHandle == NULL) {
 | |
| 		ErrorTrace((LPARAM) NULL, "CreateThread failed (err=%d)", GetLastError());
 | |
| 		CloseHandle(m_hTimeoutEvent);
 | |
| 		m_hTimeoutEvent = NULL;
 | |
| 		TraceFunctLeaveEx((LPARAM)this);
 | |
| 	    return FALSE;  		
 | |
|   	}
 | |
|    	
 | |
| 	TraceFunctLeaveEx((LPARAM)this);
 | |
|    	return(TRUE);
 | |
| }
 | |
| 
 | |
| BOOL CRetryQ::ShutdownQueue(PFN_SHUTDOWN_FN pfnShutdown) {
 | |
| 
 | |
| 	DWORD ec;
 | |
| 	TraceFunctEnterEx((LPARAM)this, "CRetryQ::ShutdownQueue");
 | |
| 
 | |
| 	// signal that we are shutting down
 | |
| 	m_fShutdown = TRUE;
 | |
| 
 | |
| 	// set the event to shutdown the retry thread
 | |
| 	SetEvent(m_hTimeoutEvent);
 | |
| 
 | |
| 	for( int i=1; i<=120; i++)
 | |
| 	{
 | |
| 		// wait to get notice that its shutdown
 | |
| 		ec = WaitForSingleObject(m_hThreadHandle, 1000);
 | |
| 		if( pfnShutdown ) {
 | |
| 			(*pfnShutdown)();
 | |
| 		}
 | |
| 
 | |
| 		if( WAIT_OBJECT_0 == ec ) break;
 | |
| 	}
 | |
| 	
 | |
| 	if (ec != WAIT_OBJECT_0) {
 | |
| 		ErrorTrace(0, "retry thread hasn't shutdown yet, waiting some more");
 | |
| 		_ASSERT(FALSE);
 | |
| 		ec = WaitForSingleObject(m_hThreadHandle, INFINITE);
 | |
| 		_ASSERT(ec == WAIT_OBJECT_0);
 | |
| 	}
 | |
| 
 | |
| 	// remove all items from the queue
 | |
| 	FlushQueue();
 | |
| 
 | |
| 	// close handles
 | |
| 	CloseHandle(m_hTimeoutEvent);
 | |
| 	CloseHandle(m_hThreadHandle);
 | |
| 
 | |
| 	TraceFunctLeaveEx((LPARAM) this);
 | |
| 
 | |
| 	return TRUE;
 | |
| }
 | |
| 
 | |
| void CRetryQ::ProcessQueue(void) {
 | |
| 	TraceFunctEnterEx((LPARAM)this, "CRetryQ::ProcessQueue");
 | |
| 
 | |
| 	PLIST_ENTRY     pEntry = NULL;
 | |
| 	PLIST_ENTRY     pEntryNext = NULL;
 | |
| 	PLIST_ENTRY     pEntryTail = NULL;
 | |
| 	CRetryQueueEntry *pContext = NULL;
 | |
| 	DWORD			NumProcessed = 0;
 | |
| 
 | |
| 	// grab the crit sect
 | |
| 	LockList();
 | |
| 
 | |
| 	// setup the pointers
 | |
| 	pEntry = m_leQueueHead.Flink,
 | |
| 	m_pLastEntry = m_leQueueHead.Blink;
 | |
| 
 | |
|     // unlock the list
 | |
| 	UnLockList();
 | |
| 
 | |
| 	for(; pEntry != &m_leQueueHead; pEntry = pEntryNext) {
 | |
| 		// save the guy in front, in case this guy gets
 | |
| 		// removed from the queue.
 | |
| 		pEntryNext = pEntry->Flink;
 | |
| 		// get the pointer to the full entry class
 | |
| 		pContext = CONTAINING_RECORD(pEntry, CRetryQueueEntry, m_le);
 | |
| 
 | |
| 		// process the entry.  if its successful then remove it from the queue
 | |
| 		if (pContext->ProcessEntry()) {
 | |
| 			RemoveFromQueue(pContext);
 | |
| 			delete pContext;
 | |
| 		}
 | |
| 
 | |
| 		// If we have just processed the guy who was in the tail
 | |
| 		// of the initial list (there could be more guys added
 | |
| 		// at this point). We exit out of the loop to prevent
 | |
| 		// infinite looping
 | |
| 		if (pEntry == m_pLastEntry) {
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		// during shutdown we need to bail
 | |
| 		if (m_fShutdown) {
 | |
| 			break;
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	TraceFunctLeaveEx((LPARAM)this);
 | |
| }
 | |
| 
 | |
| DWORD WINAPI CRetryQ::RetryQueueThread(void *ThisPtr) {
 | |
|     DWORD        cTimeout = INFINITE; // In seconds
 | |
| 	CRetryQ      *ThisQ = (CRetryQ *) ThisPtr;
 | |
| 
 | |
| 	TraceFunctEnterEx((LPARAM) ThisPtr, "RetryQueueThread(LPDWORD param)");
 | |
| 
 | |
|     for(;;) {
 | |
|         DWORD dwErr = WaitForSingleObject(ThisQ->GetTimeoutEvent(), 
 | |
| 			TimeToWait(cTimeout));
 | |
| 
 | |
|         switch (dwErr) {
 | |
|             //  Somebody wants us to wake up and do something
 | |
| 	        case WAIT_OBJECT_0:
 | |
| 				// kill the thread if we're shutting down
 | |
| 				if (ThisQ->m_fShutdown) return 0;
 | |
| 
 | |
| 				ThisQ->LockList();
 | |
| 				ThisQ->ClearLongSleep();
 | |
| 				ThisQ->UnLockList();
 | |
| 
 | |
| 				cTimeout = ThisQ->GetRetryTimeout();
 | |
| 
 | |
| 				DebugTrace((LPARAM) ThisQ,
 | |
| 					"Retry thread setting timeout value to %d", cTimeout);
 | |
| 
 | |
| 				continue;
 | |
| 
 | |
| 	        //
 | |
| 	        //  When there is work to do, we wakeup every x seconds to look at
 | |
| 	        //  the list.  That's what we need to do now.
 | |
| 	        //
 | |
| 	        case WAIT_TIMEOUT:
 | |
| 				// kill the thread if we're shutting down
 | |
| 				if (ThisQ->m_fShutdown) return 0;
 | |
| 
 | |
| 				DebugTrace((LPARAM)ThisQ,
 | |
| 					"Retry thread timed out....scanning retry list");
 | |
| 	
 | |
| 	            // We are at a Timeout Interval. Examine and timeout requests.
 | |
| 	            ThisQ->LockList();  // Prevents adding/removing items
 | |
| 	
 | |
| 	            //
 | |
| 	            //  If the list is empty, then turn off timeout processing
 | |
| 	            //  We actually wait for one complete RetryMinute before
 | |
| 	            //   entering the sleep mode.
 | |
| 	            //
 | |
| 	            if (ThisQ->IsQEmpty()) {
 | |
| 	                // We ought to enter long sleep mode.
 | |
| 					ThisQ->SetLongSleep();
 | |
| 	                ThisQ->UnLockList();
 | |
| 	
 | |
| 	                cTimeout = INFINITE;
 | |
| 					DebugTrace(
 | |
| 						(LPARAM)ThisQ, "Retry thread setting timeout value to INFINITE because list is empty");
 | |
| 	                continue;
 | |
| 	            }
 | |
| 
 | |
| 				ThisQ->UnLockList();
 | |
| 
 | |
| 				//process each item in the queue
 | |
| 				ThisQ->ProcessQueue();
 | |
| 
 | |
| 				continue;
 | |
| 
 | |
| 	        //
 | |
| 	        //  Somebody must have closed the event, time to leave
 | |
| 	        //
 | |
| 	        default:
 | |
| 				DebugTrace((LPARAM)ThisQ,"Retry thread hit default case error = %d", GetLastError());
 | |
| 				TraceFunctLeaveEx((LPARAM) ThisQ);
 | |
| 	            return 0;
 | |
|         } // switch
 | |
|     } 
 | |
| 
 | |
| 	_ASSERT(FALSE);
 | |
| 	TraceFunctLeaveEx((LPARAM) ThisQ);
 | |
|     return 0;
 | |
| } 
 |