用c++怎么写线程池求解?

 我来答
百度网友a71752b
2013-04-17 · 超过17用户采纳过TA的回答
知道答主
回答量:44
采纳率:0%
帮助的人:36.8万
展开全部
我在原来在网上找的资源,你可以参考一下。
线程池
线程是一种比较昂贵的资源.有些系统为了重用线程.引入了线程池的机制.
线程池的工作原理如下:
首先.系统会启动一定数量的线程.这些线程就构成了一个线程池.当有任务要做的时候.系统就从线程池里面选一个空闲的线程.然后把这个线程标记为“正在运行”.然后把任务传给这个线程执行.线程执行任务完成之后.就把自己标记为空闲.这个过程并不难以理解.难以理解的是.一般来说.线程执行完成之后.运行栈等系统资源就会释放.线程对象就被回收了.一个已经完成的线程.又如何能回到线程池的空闲线程队列中呢 秘诀就在于.线程池里面的线程永远不会执行完成.线程池里面的线程都是一个无穷循环
ThreadStarter.h

#ifndef __THREADSTARTER_H__
#define __THREADSTARTER_H__
#include windows.h
线程接口
class ThreadBase
{
public
ThreadBase() {}
virtual ~ThreadBase() {}
virtual bool run() = 0;线程函数
virtual void OnShutdown() {}
HANDLE THREAD_HANDLE;
};
#endif
Threads.h

#ifndef __CTHREADS_H__
#define __CTHREADS_H__
#include ThreadStarter.h
线程的状态
enum CThreadState
{
THREADSTATE_TERMINATE = 0,终止
THREADSTATE_PAUSED = 1,暂停
THREADSTATE_SLEEPING = 2,睡眠
THREADSTATE_BUSY = 3,忙碌
THREADSTATE_AWAITING = 4,等候
};
线程基类
class CThread public ThreadBase
{
public
CThread();
~CThread();
virtual bool run();
virtual void OnShutdown();
设置线程的状态
__forceinline void SetThreadState(CThreadState thread_state)
{
ThreadState = thread_state;
}
返回线程的状态
__forceinline CThreadState GetThreadState()
{
return ThreadState;
}
返回线程ID
int GetThreadId()
{
return ThreadId;
}
time_t GetStartTime()
{
return start_time;
}
protected
CThreadState ThreadState;线程的状态
time_t start_time;
int ThreadId;线程ID
};
#endif

Threads.cpp

#include stdafx.h
#include CThreads.h
CThreadCThread() ThreadBase()
{
初试化线程的状态为等候
ThreadState = THREADSTATE_AWAITING;
start_time = 0;
}
CThread~CThread()
{

}
bool CThreadrun()
{
return false;
}
void CThreadOnShutdown()
{
SetThreadState(THREADSTATE_TERMINATE);
}
Mutex.h

#ifndef __MUTEX_H__
#define __MUTEX_H__
#include windows.h
多个线程操作相同的数据时,一般是需要按顺序访问的,否则会引导数据错乱
为解决这个问题,就需要引入互斥变量,让每个线程都按顺序地访问变量。
class Mutex
{
public
Mutex();
~Mutex();
__forceinline void Acquire()
{
EnterCriticalSection(&cs);
}
__forceinline void Release()
{
LeaveCriticalSection(&cs);
}

例如:
线程操作函数。
int AddCount(void)
{
EnterCriticalSection(&cs);
int nRet = m_nCount++;
LeaveCriticalSection(&cs);
return nRet;
}
在函数AddCount里调用EnterCriticalSection和LeaveCriticalSection来互斥访问变量m_nCount。
通过上面这种方法,就可以实现多线程按顺序地访问相同的变量

__forceinline bool AttemptAcquire()
{
一个线程也可以调用TryEnterCriticalSection函数来请求某个临界区的所有权,此时即
使请求失败也不会被阻塞
return 0;(TryEnterCriticalSection(&cs) == TRUE true false);
}
protected
CRITICAL_SECTION cs;临界区是一种防止多个线程同时执行一个特定代码节的机制
};
#endif
Mutex.cpp

#include stdafx.h
#include Mutex.h

MutexMutex()
{
创建临界区对象
InitializeCriticalSection(&cs);
}
Mutex~Mutex()
{
删除临界区对象
DeleteCriticalSection(&cs);
}
ThreadPool.h

#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include ThreadStarter.h
#include Mutex.h
#include windows.h
#include assert.h
#include set

typedef unsigned int uint32;
typedef signed __int32 int32;
线程管理
class ThreadController
{
public
HANDLE hThread;
uint32 thread_id;
void Setup(HANDLE h)
{
hThread = h;
}
void Suspend()
{
assert(GetCurrentThreadId() == thread_id);
当线程做完任务或者现在想暂停线程运行,就需要使用SuspendThread来暂停线程的执行
SuspendThread(hThread);

}
恢复线程的执行就是使用ResumeThread函数了
void Resume()
{
assert(GetCurrentThreadId() != thread_id);
if(!ResumeThread(hThread))
{
DWORD le = GetLastError();
printf(error %un, le);
}
}
void Join()
{
WaitForSingleObject函数用来检测hHandle事件的信号状态,当函数的执行时间超过dwMilliseconds就返回
WaitForSingleObject(hThread, INFINITE);
}
uint32 GetId()
{
return thread_id;
}
};
struct Thread
{
ThreadBase ExecutionTarget;
ThreadController ControlInterface;
Mutex SetupMutex;线程的互斥
bool DeleteAfterExit;
};
typedef stdsetThread ThreadSet;
线程池类
class CThreadPool
{
uint32 _threadsRequestedSinceLastCheck;
uint32 _threadsFreedSinceLastCheck;
uint32 _threadsExitedSinceLastCheck;
uint32 _threadsToExit;
int32 _threadsEaten;可用线程数量
Mutex _mutex;
ThreadSet m_activeThreads;正在执行任务线程对列
ThreadSet m_freeThreads;可用线程对列
public
CThreadPool();
void IntegrityCheck();
创建指定数量的线程并加到线程池
void Startup();
销毁线程
void Shutdown();

bool ThreadExit(Thread t);
Thread StartThread(ThreadBase ExecutionTarget);
从线程池取得可用线程并执行任务
void ExecuteTask(ThreadBase ExecutionTarget);
void ShowStats();

void KillFreeThreads(uint32 count);

__forceinline void Gobble(){
_threadsEaten=(int32)m_freeThreads.size();
}
__forceinline uint32 GetActiveThreadCount(){
return (uint32)m_activeThreads.size();
}
__forceinline uint32 GetFreeThreadCount(){
return (uint32)m_freeThreads.size();
}
};
extern CThreadPool ThreadPool;线程池
#endif
ThreadPool.cpp

#include stdafx.h
#include ThreadPool.h
#include process.h
CThreadPool ThreadPool;
CThreadPoolCThreadPool()
{
_threadsExitedSinceLastCheck = 0;
_threadsRequestedSinceLastCheck = 0;
_threadsEaten = 0;
_threadsFreedSinceLastCheck = 0;
}

bool CThreadPoolThreadExit(Thread t)
{
_mutex.Acquire();

m_activeThreads.erase(t);
if(_threadsToExit 0)
{
--_threadsToExit;
++_threadsExitedSinceLastCheck;
if(t-DeleteAfterExit)
m_freeThreads.erase(t);
_mutex.Release();
delete t;
return false;
}

enter the suspended pool
++_threadsExitedSinceLastCheck;
++_threadsEaten;
stdsetThreaditerator itr = m_freeThreads.find(t);
if(itr != m_freeThreads.end())
{

}
m_freeThreads.insert(t);

_mutex.Release();
return true;

}

void CThreadPoolExecuteTask(ThreadBase ExecutionTarget)
{
Thread t;
_mutex.Acquire();
++_threadsRequestedSinceLastCheck;
--_threadsEaten;
从线程池夺取一个线程
if(m_freeThreads.size())有可用线程
{
得到一个可用线程
t = m_freeThreads.begin();
把它从可用线程对列里删掉
m_freeThreads.erase(m_freeThreads.begin());
给这个线程一个任务
t-ExecutionTarget = ExecutionTarget;

恢复线程的执行
t-ControlInterface.Resume();
}
else
{
创建一个新的线程并执行任务
t = StartThread(ExecutionTarget);
}
把线程加到执行任务线程对列
m_activeThreads.insert(t);
_mutex.Release();

}
void CThreadPoolStartup()
{
int i;
int tcount = 5;
for(i=0; i tcount; ++i)
StartThread(NULL);
}
void CThreadPoolShowStats()
{
_mutex.Acquire();
在这里输出线程池的状态

_mutex.Release();
}
void CThreadPoolKillFreeThreads(uint32 count)
{
_mutex.Acquire();
Thread t;
ThreadSetiterator itr;
uint32 i;
for(i = 0, itr = m_freeThreads.begin(); i count && itr != m_freeThreads.end(); ++i, ++itr)
{
t = itr;
t-ExecutionTarget = NULL;
t-DeleteAfterExit = true;
++_threadsToExit;
t-ControlInterface.Resume();
}
_mutex.Release();
}
void CThreadPoolShutdown()
{
_mutex.Acquire();
size_t tcount = m_activeThreads.size() + m_freeThreads.size();

KillFreeThreads((uint32)m_freeThreads.size());
_threadsToExit += (uint32)m_activeThreads.size();
for(ThreadSetiterator itr = m_activeThreads.begin(); itr != m_activeThreads.end(); ++itr)
{
if((itr)-ExecutionTarget)
(itr)-ExecutionTarget-OnShutdown();
}
_mutex.Release();
for(;;)
{
_mutex.Acquire();
if(m_activeThreads.size() m_freeThreads.size())
{
_mutex.Release();
Sleep(1000);
continue;
}
break;
}
}

static unsigned long WINAPI thread_proc(void param)
{
Thread t = (Thread)param;
t-SetupMutex.Acquire();
uint32 tid = t-ControlInterface.GetId();
bool ht = (t-ExecutionTarget != NULL);
t-SetupMutex.Release();

for(;;)
{
if(t-ExecutionTarget != NULL)
{
if(t-ExecutionTarget-run())执行任务,返回true表示任务完成
delete t-ExecutionTarget;
t-ExecutionTarget = NULL;
}
if(!ThreadPool.ThreadExit(t))
{
Log.Debug(ThreadPool, Thread %u exiting., tid);
break;
}
else
{
if(ht)
printf(ThreadPool线程%d正在等待新任务., tid);
t-ControlInterface.Suspend();暂停线程运行
}
}
ExitThread(0);
return 0;
}
Thread CThreadPoolStartThread(ThreadBase ExecutionTarget)
{
HANDLE h;
Thread t = new Thread;

t-DeleteAfterExit = false;
t-ExecutionTarget = ExecutionTarget;
t-SetupMutex.Acquire();

/*CreateThread(
lpThreadAttributes是线程的属性,
dwStackSize是线程的栈大小,
lpStartAddress是线程函数的开始地址,
lpParameter是传送给线程函数的参数,
dwCreationFlags是创建线程标志,比如挂起线程,
lpThreadId是标识这个线程的ID)*/

h = CreateThread(NULL, 0, &thread_proc, (LPVOID)t, 0, (LPDWORD)&t-ControlInterface.thread_id);
t-ControlInterface.Setup(h);
t-SetupMutex.Release();

return t;
}
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式