zmqNut

明知会散落, 仍不惧盛开

0%

C++11实现线程池

978956

场景

大多数网络服务器单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。

除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为10000,那么最坏情况下,系统可能需要产生10000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求.

线程池的定义

事先创建若干空闲的线程放入一个池中(容器),当一个任务提交到线程池时,线程池就会启动一个空闲的线程去处理任务,当任务结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

img

优点

  1. 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,监控和调优。

事实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略

线程池工作的几种情况

按任务队列和线程池大小可分成四种情况:

  • 没有任务,线程池中任务队列为空,啥也不做

    空队列情况
  • 添加小于等于线程池数量的任务,主线程添加任务后通知唤醒线程池中的线程开始取任务。此时任务缓冲队列还是空

    img
  • 添加大于线程池数量的任务,继续添加发现线程池用完,于是存入缓冲队列,工作线程空闲后主动从任务队列取任务执行,缓冲队列未满

任务数量大于线程池数量
  • 添加大于线程池数量的任务,且任务队列已满,当线程中线程用完,且任务缓冲队列已满,进入等待状态,等待任务缓冲队列通知
img

线程池的实现

使用C++11中的bindfunction定义和调用任务处理函数和std::thread等实现的按照任务优先级先后处理的线程池。

ThreadPool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <functional>
#include <iostream>
#include <condition_variable>
#include <future>
#include <assert.h>

class ThreadPool{
public:
//function<void()> 可以认为是一个函数类型,接受任意原型是 void() 的函数,或是函数对象,或是匿名函数。
//void() 意思是不带参数,没有返回值。
typedef std::function<void()> Task_type;//任务类型

enum taskPriorityE {LOW, MIDDLE, HIGH};//优先级

typedef std::pair<taskPriorityE,Task_type> TaskPair;//任务优先级和任务类型组合的任务

ThreadPool(int threads_size = 4);//默认线程池大小为4
~ThreadPool();


void stop();//终止线程池
void addTask(const Task_type&);//添加任务
void addTask(const TaskPair&);

private:
//禁用拷贝构造和赋值运算符
ThreadPool(const ThreadPool& other) = delete;
const ThreadPool& operator=(const ThreadPool& other) = delete;

bool is_started() const { return m_started;}
void start();//运行线程池

void threadLoop();
Task_type take();

typedef std::vector<std::thread*> Threads_type;

//重载()操作符实现优先队列的比较操作
struct TaskPriorityCmp {
bool operator()(const TaskPair& a, const TaskPair& b) const {
return a.first > b.first;
}
};

int m_threads_size;//线程池大小
std::vector<std::thread*> m_threads;//线程池
std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> m_tasks;//任务队列

std::mutex m_mutex;//STL队列不是线程安全的,因此需要结合互斥锁
std::condition_variable m_cond;//条件变量
bool m_started;//是否开始
};

//构造函数通过传入的参数,初始化线程池大小、锁、条件变量、是否开始,之后开始运行
ThreadPool::ThreadPool(int threads_size)
:m_threads_size(threads_size), m_mutex(), m_cond(), m_started(false)
{
start();
}

//析构函数
ThreadPool::~ThreadPool()
{
if(m_started)
{
stop();
}
}

//start中创建线程,并将线程和任务处理函数进行绑定
void ThreadPool::start()
{
// assert 的作用是现计算表达式 expression,如果其值为假(即为0),那么它先向 stderr 打印一条出错信息,然后通过调用 abort 来终止程序运行。
assert(m_threads.empty());
assert(!m_started);
m_started = true;
m_threads.reserve(m_threads_size);
for(int i=0; i < m_threads_size; ++i)
{
//函数绑定bind函数用于把某种形式的参数列表与已知的函数进行绑定,形成新的函数
m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this)));
}
}

//线程池终止,通知所有线程,并将所有线程分离,最后将线程池清空
void ThreadPool::stop()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_started = false;
m_cond.notify_all();

for(auto it = m_threads.begin(); it != m_threads.end(); ++it)
{
//等待线程执行直到终止,其实就说join方法将挂起调用线程的执行,直到被调用的对象完成它的执行
(*it)->join();
delete *it;
}
m_threads.clear();
}

//threadLoop中循环从队列中拿任务并执行
void ThreadPool::threadLoop()
{
while(m_started)
{
Task_type task = take();
if(task)
task();
}
}

//添加任务
void ThreadPool::addTask(const Task_type& task)
{
std::unique_lock<std::mutex> lock(m_mutex);
//默认中等优先级
TaskPair taskPair(MIDDLE, task);
m_tasks.emplace(taskPair);
m_cond.notify_one();
}

void ThreadPool::addTask(const TaskPair& taskPair)
{

std::unique_lock<std::mutex> lock(m_mutex);
m_tasks.emplace(taskPair);
m_cond.notify_one();
}

//从任务队列拿任务
ThreadPool::Task_type ThreadPool::take()
{
std::unique_lock<std::mutex> lock(m_mutex);
while(m_tasks.empty() && m_started)
{
m_cond.wait(lock);
}

Task_type task;

int size = m_tasks.size();
if(!m_tasks.empty() && m_started)
{
task = m_tasks.top().second;
m_tasks.pop();
assert(size -1 == m_tasks.size());
}
return task;
}

#endif

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include "ThreadPool.h"
#include<bits/stdc++.h>
using namespace std;

mutex g_mutex;

void MyFunc()
{
for (int i = 0; i < 10; ++i)
{
//此锁是保证输出完整
std::lock_guard<std::mutex> lock(g_mutex);
std::cout << "MyFunc()" << "at thread [" << std::this_thread::get_id() << "] output" << std::endl;
}
}

int main() {
ThreadPool tp;
mutex mtx;

//这里只添加四次任务,因此任务队列实际大小为4
for (int i = 0; i < 4; ++i) {
tp.addTask(MyFunc);
}
getchar();
return 0;
}

参考链接

https://github.com/progschj/ThreadPool

https://github.com/attackoncs/MyThreadPool

https://github.com/mtrebi/thread-pool

---------- End~~ 撒花ฅ>ω<*ฅ花撒 ----------