ThreadPoolExecutor.cpp 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #include "ThreadPoolExecutor.h"
  2. CThreadPoolExecutor* CThreadPoolExecutor::m_pThreadPoolExecutor = nullptr;
  3. CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :
  4. m_pThreadPool(pThreadPool),
  5. m_pFirstTask(pFirstTask),
  6. m_bRun(true)
  7. {
  8. }
  9. CThreadPoolExecutor::CWorker::~CWorker()
  10. {
  11. }
  12. CThreadPoolExecutor* CThreadPoolExecutor::GetInstance()
  13. {
  14. if (m_pThreadPoolExecutor == nullptr)
  15. {
  16. m_pThreadPoolExecutor = new CThreadPoolExecutor;
  17. }
  18. return m_pThreadPoolExecutor;
  19. }
  20. void CThreadPoolExecutor::FreeInstance()
  21. {
  22. if (m_pThreadPoolExecutor)
  23. {
  24. delete m_pThreadPoolExecutor;
  25. m_pThreadPoolExecutor = nullptr;
  26. }
  27. }
  28. /**
  29. 执行任务的工作线程。
  30. 当前没有任务时,
  31. 如果当前线程数量大于最小线程数量,减少线程,
  32. 否则,执行清理程序,将线程类给释放掉
  33. **/
  34. void CThreadPoolExecutor::CWorker::Run()
  35. {
  36. Runnable * pTask = NULL;
  37. while(m_bRun)
  38. {
  39. if(NULL == m_pFirstTask)
  40. {
  41. pTask = m_pThreadPool->GetTask();
  42. }
  43. else
  44. {
  45. pTask = m_pFirstTask;
  46. m_pFirstTask = NULL;
  47. }
  48. if(NULL == pTask)
  49. {
  50. EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
  51. if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)
  52. {
  53. ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);
  54. if(itr != m_pThreadPool->m_ThreadPool.end())
  55. {
  56. m_pThreadPool->m_ThreadPool.erase(itr);
  57. m_pThreadPool->m_TrashThread.insert(this);
  58. }
  59. m_bRun = false;
  60. }
  61. else
  62. {
  63. ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();
  64. while(itr != m_pThreadPool->m_TrashThread.end())
  65. {
  66. (*itr)->Join();
  67. delete (*itr);
  68. m_pThreadPool->m_TrashThread.erase(itr);
  69. itr = m_pThreadPool->m_TrashThread.begin();
  70. }
  71. }
  72. LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
  73. continue;
  74. }
  75. else
  76. {
  77. pTask->Run();
  78. pTask = NULL;
  79. }
  80. }
  81. }
  82. /////////////////////////////////////////////////////////////////////////////////////////////
  83. CThreadPoolExecutor::CThreadPoolExecutor(void) :
  84. m_bRun(false),
  85. m_bEnableInsertTask(false)
  86. {
  87. InitializeCriticalSection(&m_csTasksLock);
  88. InitializeCriticalSection(&m_csThreadPoolLock);
  89. }
  90. CThreadPoolExecutor::~CThreadPoolExecutor(void)
  91. {
  92. Terminate();
  93. DeleteCriticalSection(&m_csTasksLock);
  94. DeleteCriticalSection(&m_csThreadPoolLock);
  95. }
  96. bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)
  97. {
  98. if(minThreads == 0)
  99. {
  100. return false;
  101. }
  102. if(maxThreads < minThreads)
  103. {
  104. return false;
  105. }
  106. m_minThreads = minThreads;
  107. m_maxThreads = maxThreads;
  108. m_maxPendingTasks = maxPendingTasks;
  109. unsigned int i = m_ThreadPool.size();
  110. for(; i<minThreads; i++)
  111. {
  112. //创建线程
  113. CWorker * pWorker = new CWorker(this);
  114. if(NULL == pWorker)
  115. {
  116. return false;
  117. }
  118. EnterCriticalSection(&m_csThreadPoolLock);
  119. m_ThreadPool.insert(pWorker);
  120. LeaveCriticalSection(&m_csThreadPoolLock);
  121. pWorker->Start();
  122. }
  123. m_bRun = true;
  124. m_bEnableInsertTask = true;
  125. return true;
  126. }
  127. bool CThreadPoolExecutor::Execute(Runnable * pRunnable)
  128. {
  129. if(!m_bEnableInsertTask)
  130. {
  131. return false;
  132. }
  133. if(NULL == pRunnable)
  134. {
  135. return false;
  136. }
  137. if(m_Tasks.size() >= m_maxPendingTasks)
  138. {
  139. if(m_ThreadPool.size() < m_maxThreads)
  140. {
  141. CWorker * pWorker = new CWorker(this, pRunnable);
  142. if(NULL == pWorker)
  143. {
  144. return false;
  145. }
  146. EnterCriticalSection(&m_csThreadPoolLock);
  147. m_ThreadPool.insert(pWorker);
  148. LeaveCriticalSection(&m_csThreadPoolLock);
  149. pWorker->Start();
  150. }
  151. else
  152. {
  153. return false;
  154. }
  155. }
  156. else
  157. {
  158. EnterCriticalSection(&m_csTasksLock);
  159. m_Tasks.push_back(pRunnable);
  160. LeaveCriticalSection(&m_csTasksLock);
  161. }
  162. return true;
  163. }
  164. Runnable * CThreadPoolExecutor::GetTask()
  165. {
  166. Runnable * Task = NULL;
  167. EnterCriticalSection(&m_csTasksLock);
  168. if(!m_Tasks.empty())
  169. {
  170. Task = m_Tasks.front();
  171. m_Tasks.pop_front();
  172. }
  173. LeaveCriticalSection(&m_csTasksLock);
  174. return Task;
  175. }
  176. unsigned int CThreadPoolExecutor::GetThreadPoolSize()
  177. {
  178. return m_ThreadPool.size();
  179. }
  180. void CThreadPoolExecutor::Terminate()
  181. {
  182. m_bEnableInsertTask = false;
  183. while(m_Tasks.size() > 0)
  184. {
  185. Sleep(1);
  186. }
  187. m_bRun = false;
  188. m_minThreads = 0;
  189. m_maxThreads = 0;
  190. m_maxPendingTasks = 0;
  191. while(m_ThreadPool.size() > 0)
  192. {
  193. Sleep(1);
  194. }
  195. EnterCriticalSection(&m_csThreadPoolLock);
  196. ThreadPoolItr itr = m_TrashThread.begin();
  197. while(itr != m_TrashThread.end())
  198. {
  199. (*itr)->Join();
  200. delete (*itr);
  201. m_TrashThread.erase(itr);
  202. itr = m_TrashThread.begin();
  203. }
  204. LeaveCriticalSection(&m_csThreadPoolLock);
  205. }