2025-04-27 07:49:33 -04:00

448 lines
14 KiB
C++

/*****************************************************************************
* Copyright (c) 1998-2001 Microsoft Corporation, All Rights Reserved
*
* All Rights Reserved
*
* This software is furnished under a license and may be used and copied
* only in accordance with the terms of such license and with the inclusion
* of the above copyright notice. This software or any other copies thereof
* may not be provided or otherwise made available to any other person. No
* title to and ownership of the software is hereby transferred.
*****************************************************************************/
//============================================================================
//
// CThreadPool.cpp -- Thread pool class
//
// Copyright (c) 1998-2001 Microsoft Corporation, All Rights Reserved
//
// Revisions: 7/14/98 a-kevhu Created
//
//============================================================================
#include "precomp.h"
#include "CThreadPool.h"
#include "CAutoLock.h"
//#include <fstream.h>
CThreadPool::CThreadPool(LONG lPoolSize)
: m_hSemPool(NULL),
m_lPoolSize(lPoolSize),
m_ppfThreadAvailIndex(NULL),
m_ppctPool(NULL),
m_ceJobsPending(FALSE,FALSE),
m_ceOutOfJobs(TRUE,TRUE), // manually reset by PoolAttendant
m_lJobsPendingCount(0L),
m_pctPoolAttendant(NULL),
m_pctPoolCleaner(NULL)
{
// Allocate array used to indicate thread availability:
m_ppfThreadAvailIndex = (BOOL**) new BOOL*[m_lPoolSize];
// Create the pool of threads:
m_ppctPool = (CThread**) new CThread*[m_lPoolSize];
// Create array of thread handles:
m_phThreadHandleArray = (HANDLE*) new HANDLE[m_lPoolSize];
// Initialize the pool threads and threadavail and handle arrays
for(LONG m = 0L; m < m_lPoolSize; m++)
{
TCHAR tstrTmp[32];
wsprintf(tstrTmp,_T("POOL_%d"),m);
m_ppctPool[m] = (CThread*) new CThread(tstrTmp);
m_ppfThreadAvailIndex[m] = (BOOL*) new BOOL;
*m_ppfThreadAvailIndex[m] = TRUE; // Initially all threads are available
m_phThreadHandleArray[m] = m_ppctPool[m]->GetHandle();
Sleep(0); // allow thread to initialize
}
// Create the semaphore that will control access to the pool
m_hSemPool = CreateSemaphore(NULL,m_lPoolSize,m_lPoolSize,IDS_POOL_SEM_NAME);
// Create Attendant and cleaner threads
m_pctPoolAttendant = (CThread*) new CThread(_T("Attendant"),PoolAttendant,this);
m_pctPoolCleaner = (CThread*) new CThread(_T("Cleaner"),PoolCleaner,this);
// Now that everything is set up, run the thread that manages the pool:
m_pctPoolAttendant->RunThreadProc();
// and run the thread that cleans the pool:
m_pctPoolCleaner->RunThreadProc();
Sleep(50); // allow all threads to reach quiescent state
}
CThreadPool::~CThreadPool()
{
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("~CThreadPool() -> BEGINNING OF THE END..."));
LogMsg(szMsg);
}
#endif
// Signal the threadprocwrapper threads to stop once the proc they are
// currently running finishes...
for(LONG p = 0L; p < m_lPoolSize; p++)
{
m_ppctPool[p]->SignalToStop();
}
// Wait for the pool threads (threadprocwrapper functions) to finish...
DWORD dwWait = WaitTillSwimmersDone(DESTROY_POOL_MAX_WAIT);
// Kill the poolAttendant thread:
m_pctPoolAttendant->SignalToStop();
Sleep(0);
// Kill the pool cleaner thread:
m_pctPoolCleaner->SignalToStop();
Sleep(0);
// Wait for those two threads to terminate...
HANDLE hHandles[2];
hHandles[0] = m_pctPoolAttendant->GetThreadDieEventHandle();
hHandles[1] = m_pctPoolCleaner->GetThreadDieEventHandle();
if(WaitForMultipleObjects(2,hHandles, TRUE, DESTROY_POOL_MAX_WAIT) != WAIT_OBJECT_0)
{
// get serious... they aren't dying fast enough...
DWORD dwExitCode = 666;
m_pctPoolAttendant->Terminate(dwExitCode);
m_pctPoolCleaner->Terminate(dwExitCode);
}
// In the meantime, back at the ranch, we may have timed out waiting
// for the pool threads to finish, so need to force them to stop now...
if(dwWait == WAIT_TIMEOUT)
{
KillPool();
}
// Delete all waiting jobs:
RemoveJobsFromDeque();
// Now we should be able to deallocate everything with impunity...
for(LONG m = 0L; m < m_lPoolSize; m++)
{
delete m_ppfThreadAvailIndex[m];
delete m_ppctPool[m];
}
delete [] m_ppfThreadAvailIndex;
delete [] m_ppctPool;
delete m_phThreadHandleArray;
delete m_pctPoolAttendant;
delete m_pctPoolCleaner;
CloseHandle(m_hSemPool);
}
// DispatchQueue adds a job to the stack of jobs performed by the PoolAttendant...
VOID CThreadPool::DispatchQueue(LPBTEX_START_ROUTINE pfn,
LPVOID lpvData,
bool fSetJobPending)
{
CJob* pcjob = (CJob*) new CJob(pfn,lpvData);
CAutoLock cal(m_csJobs);
m_Jobs.push_back(pcjob);
if(fSetJobPending)
{
SetJobsPendingEvent();
}
}
// Gives external access to the jobspending event
VOID CThreadPool::SetJobsPendingEvent()
{
m_ceJobsPending.Set();
Sleep(0);
}
CJob* CThreadPool::GetJob()
{
CJob* pcj = NULL;
CAutoLock cal(m_csJobs);
if(!m_Jobs.empty())
{
pcj = m_Jobs.front();
}
return pcj;
}
VOID CThreadPool::RemoveJob()
{
CAutoLock cal(m_csJobs);
if(!m_Jobs.empty())
{
delete m_Jobs.front();
m_Jobs.pop_front();
}
}
VOID CThreadPool::RemoveJobsFromDeque()
{
LONG lJobsRemaining = 0L;
CAutoLock cal(m_csJobs);
for(LONG p = 0L; p < lJobsRemaining; p++)
{
delete m_Jobs.front();
m_Jobs.pop_front();
}
}
HRESULT CThreadPool::WaitTillSwimmersDone(DWORD dwTimeout)
{
return ::WaitForMultipleObjects(m_lPoolSize, m_phThreadHandleArray, TRUE, dwTimeout);
}
// The client ap, which presumably knows when it is done adding jobs
// to the queue, can use this function to wait until all jobs are done...
DWORD CThreadPool::WaitTillAllJobsDone(DWORD dwTimeout)
{
DWORD dwRet = ::WaitForSingleObject(m_ceOutOfJobs,dwTimeout);
if(dwRet == WAIT_FAILED)
{
dwRet = ::GetLastError();
}
return dwRet;
}
// Forceably kills threads from the pool
VOID CThreadPool::KillPool()
{
DWORD dwExitCode = 666;
// no more Mr. Nice Guy...
for(LONG p = 0L; p < m_lPoolSize; p++)
{
m_ppctPool[p]->Terminate(dwExitCode);
}
}
// This function oversees the pools activities by assigning jobs out of the
// stack of jobs into threads. It goes dormant if there are no jobs pending.
unsigned _stdcall PoolAttendant(LPVOID lParam)
{
CThreadPool* pctp = (CThreadPool*) lParam;
DWORD dwRetVal = 6;
if(pctp == NULL)
{
return -6;
}
HANDLE hEvents[2] = { pctp->m_ceJobsPending, pctp->m_pctPoolAttendant->GetThreadDieEventHandle() };
CJob* pcjob = NULL;
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("Starting PoolAttendant() (thread %s)"), pctp->m_pctPoolAttendant->GetThreadName());
LogMsg(szMsg);
}
#endif
LONG m = 0L;
// Wait while there are jobs pending and we are not signaled to close the pool:
while(WaitForMultipleObjects(2,hEvents,FALSE,INFINITE) == WAIT_OBJECT_0)
{
// The wait ended because I had a job.
// Process jobs as long as there are jobs queued up...
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("PoolAttendant(): I've been signaled that there are jobs..."));
LogMsg(szMsg);
}
#endif
while((pcjob = pctp->GetJob()) != NULL)
{
// Indicate that we are NOT out of work (we have a job):
pctp->m_ceOutOfJobs.Reset();
// Wait for the semaphore
if(WaitForSingleObject(pctp->m_hSemPool, INFINITE) == WAIT_OBJECT_0)
{
pctp->m_csPool.Enter();
{
// Increment index to indicate the first available thread:
for(m = 0L; m < pctp->m_lPoolSize && *(pctp->m_ppfThreadAvailIndex[m]) == FALSE; m++);
*(pctp->m_ppfThreadAvailIndex[m]) = FALSE;
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("PoolAttendant(): assigned a job to pool slot %d"), m);
LogMsg(szMsg);
}
#endif
}
pctp->m_csPool.Leave();
// Now we have our reserved seat in the pool, so get ready to start swimming:
pctp->m_ppctPool[m]->SetThreadProc(pcjob->m_pfn);
pctp->m_ppctPool[m]->SetThreadProcData(pcjob->m_pdata);
pctp->m_ppctPool[m]->RunThreadProc();
// The Attendant doesn't need to wait for the job to finish. He just looks for another job,
// and then waits for space in the pool. The PoolCleaner keeps track of when jobs are finished
// and frees up space in the pool when they are.
}
// remove the job from the stack of jobs:
pctp->RemoveJob();
} // while there were jobs
// For those who care, signal that we are (at least temporarily) out of work:
pctp->m_ceOutOfJobs.Set();
Sleep(0); // allow that event to percolate
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("PoolAttendant(): I just announced that I am out of jobs..."));
LogMsg(szMsg);
}
#endif
} // while not signaled to terminate
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("PoolAttendant(): Exiting"));
LogMsg(szMsg);
}
#endif
return dwRetVal;
}
// This function cleans the pool if any pool thread's ThreadProcDone event is set.
// Since more than one could have been set at exactly the same time, this function
// checks each.
// It goes dormant if no thread's ThreadProcDone events are set and it isn't signaled
// to die.
unsigned _stdcall PoolCleaner(LPVOID lParam)
{
CThreadPool* pctp = (CThreadPool*) lParam;
if(pctp == NULL)
{
return -7;
}
HANDLE* phSomeoneIsDoneOrImDead = NULL;
DWORD dwRetval = 7;
DWORD dwDoneOrDeadVal = -1;
bool fKeepGoing = true;
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("Starting PoolCleaner() (thread %s)"), pctp->m_pctPoolCleaner->GetThreadName());
LogMsg(szMsg);
}
#endif
phSomeoneIsDoneOrImDead = (HANDLE*) new HANDLE[pctp->m_lPoolSize + 1];
if(phSomeoneIsDoneOrImDead != NULL)
{
// The first p handles relate to the pool's threads
for(LONG p = 0; p < pctp->m_lPoolSize; p++)
{
phSomeoneIsDoneOrImDead[p] = pctp->m_ppctPool[p]->GetThreadProcDoneEventHandle();
}
// The last handle is the ThreadDie event for the thread running this function (PoolCleaner)
phSomeoneIsDoneOrImDead[p] = pctp->m_pctPoolCleaner->GetThreadDieEventHandle();
while(fKeepGoing)
{
// Enter into an efficient wait state (till either signaled to die or some thread finished its work)...
dwDoneOrDeadVal = WaitForMultipleObjects(pctp->m_lPoolSize + 1, phSomeoneIsDoneOrImDead, FALSE, INFINITE);
// If the last handle was one of the ones set, this PoolCleaner routine should end.
// If not, then we are good to continue. Use wfso to check:
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("PoolCleaner(): Indicated that someone is done or I'm dead - wait value was %d"), dwDoneOrDeadVal);
LogMsg(szMsg);
}
#endif
if(WaitForSingleObject(phSomeoneIsDoneOrImDead[p],0) != WAIT_OBJECT_0) // WAIT_OBJECT_0 would be set now if we had been signaled to die...
{
// One or more threads are done, so go through each and indicate as free those that are done
for(LONG s = 0; s < pctp->m_lPoolSize; s++)
{
// Use wfso to check the status of the event handle to find out which thread finished its work:
if(WaitForSingleObject(phSomeoneIsDoneOrImDead[s],0) == WAIT_OBJECT_0) // the event was signaled
{
// The status of the thread in the pool corresponding to that event can be
// switched to indicate it is available:
pctp->m_csPool.Enter();
{
*(pctp->m_ppfThreadAvailIndex[s]) = TRUE;
// Reset that thread's ThreadProcDone event:
ResetEvent(phSomeoneIsDoneOrImDead[s]);
Sleep(0); // allow that event to percolate
}
pctp->m_csPool.Leave();
ReleaseSemaphore(pctp->m_hSemPool, 1, NULL);
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("PoolCleaner(): released pool slot %d"), s);
LogMsg(szMsg);
}
#endif
}
}
} // if not signaled to die
else //we were signaled to die
{
fKeepGoing = false;
}
} // while we should KeepGoing
delete [] phSomeoneIsDoneOrImDead;
} // phSomeoneIsDoneOrImDead allocation successful
#ifdef TEST
{
TCHAR szMsg[256];
wsprintf(szMsg,_T("PoolCleaner(): Exiting"));
LogMsg(szMsg);
}
#endif
return dwRetval;
}