
场景
大多数网络服务器单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为10000,那么最坏情况下,系统可能需要产生10000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求.
线程池的定义
事先创建若干空闲的线程放入一个池中(容器),当一个任务提交到线程池时,线程池就会启动一个空闲的线程去处理任务,当任务结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

优点
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,监控和调优。
事实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略
线程池工作的几种情况
按任务队列和线程池大小可分成四种情况:
- 添加大于线程池数量的任务,且任务队列已满,当线程中线程用完,且任务缓冲队列已满,进入等待状态,等待任务缓冲队列通知
线程池的实现
使用C++11中的bind
、function
定义和调用任务处理函数和std::thread
等实现的按照任务优先级先后处理的线程池。
ThreadPool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
| #ifndef _THREAD_POOL_H #define _THREAD_POOL_H
#include <vector> #include <queue> #include <thread> #include <mutex> #include <functional> #include <iostream> #include <condition_variable> #include <future> #include <assert.h>
class ThreadPool{ public: typedef std::function<void()> Task_type;
enum taskPriorityE {LOW, MIDDLE, HIGH}; typedef std::pair<taskPriorityE,Task_type> TaskPair;
ThreadPool(int threads_size = 4); ~ThreadPool();
void stop(); void addTask(const Task_type&); void addTask(const TaskPair&);
private: ThreadPool(const ThreadPool& other) = delete; const ThreadPool& operator=(const ThreadPool& other) = delete;
bool is_started() const { return m_started;} void start();
void threadLoop(); Task_type take();
typedef std::vector<std::thread*> Threads_type;
struct TaskPriorityCmp { bool operator()(const TaskPair& a, const TaskPair& b) const { return a.first > b.first; } };
int m_threads_size; std::vector<std::thread*> m_threads; std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> m_tasks;
std::mutex m_mutex; std::condition_variable m_cond; bool m_started; };
ThreadPool::ThreadPool(int threads_size) :m_threads_size(threads_size), m_mutex(), m_cond(), m_started(false) { start(); }
ThreadPool::~ThreadPool() { if(m_started) { stop(); } }
void ThreadPool::start() { assert(m_threads.empty()); assert(!m_started); m_started = true; m_threads.reserve(m_threads_size); for(int i=0; i < m_threads_size; ++i) { m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this))); } }
void ThreadPool::stop() { std::unique_lock<std::mutex> lock(m_mutex); m_started = false; m_cond.notify_all();
for(auto it = m_threads.begin(); it != m_threads.end(); ++it) { (*it)->join(); delete *it; } m_threads.clear(); }
void ThreadPool::threadLoop() { while(m_started) { Task_type task = take(); if(task) task(); } }
void ThreadPool::addTask(const Task_type& task) { std::unique_lock<std::mutex> lock(m_mutex); TaskPair taskPair(MIDDLE, task); m_tasks.emplace(taskPair); m_cond.notify_one(); }
void ThreadPool::addTask(const TaskPair& taskPair) {
std::unique_lock<std::mutex> lock(m_mutex); m_tasks.emplace(taskPair); m_cond.notify_one(); }
ThreadPool::Task_type ThreadPool::take() { std::unique_lock<std::mutex> lock(m_mutex); while(m_tasks.empty() && m_started) { m_cond.wait(lock); }
Task_type task;
int size = m_tasks.size(); if(!m_tasks.empty() && m_started) { task = m_tasks.top().second; m_tasks.pop(); assert(size -1 == m_tasks.size()); } return task; }
#endif
|
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include "ThreadPool.h" #include<bits/stdc++.h> using namespace std;
mutex g_mutex;
void MyFunc() { for (int i = 0; i < 10; ++i) { std::lock_guard<std::mutex> lock(g_mutex); std::cout << "MyFunc()" << "at thread [" << std::this_thread::get_id() << "] output" << std::endl; } }
int main() { ThreadPool tp; mutex mtx; for (int i = 0; i < 4; ++i) { tp.addTask(MyFunc); } getchar(); return 0; }
|
参考链接
https://github.com/progschj/ThreadPool
https://github.com/attackoncs/MyThreadPool
https://github.com/mtrebi/thread-pool