首页 > 基于C++实现线程池加速

基于C++实现线程池加速

经过长期探索,发现一个不需要手动设置线程休眠时间(e.g. std::this_thread::sleep_for(std::chrono::microseconds(1)))的代码:

Github: https://github.com/log4cplus/ThreadPool
#ifndef THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
#define THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include namespace progschj { class ThreadPool { public:explicit ThreadPool(std::size_t threads= (std::max)(2u, std::thread::hardware_concurrency()));template<class F, class... Args>auto enqueue(F&& f, Args&&... args)->std::future<typename std::result_of<F(Args...)>::type>;void wait_until_empty();void wait_until_nothing_in_flight();void set_queue_size_limit(std::size_t limit);void set_pool_size(std::size_t limit);~ThreadPool();private:void start_worker(std::size_t worker_number,std::unique_lock<std::mutex> const &lock);// need to keep track of threads so we can join themstd::vector< std::thread > workers;// target pool sizestd::size_t pool_size;// the task queuestd::queue< std::function<void()> > tasks;// queue length limitstd::size_t max_queue_size = 100000;// stop signalbool stop = false;// synchronizationstd::mutex queue_mutex;std::condition_variable condition_producers;std::condition_variable condition_consumers;std::mutex in_flight_mutex;std::condition_variable in_flight_condition;std::atomic<std::size_t> in_flight;struct handle_in_flight_decrement{ ThreadPool & tp;handle_in_flight_decrement(ThreadPool & tp_): tp(tp_){  }~handle_in_flight_decrement(){ std::size_t prev= std::atomic_fetch_sub_explicit(&tp.in_flight,std::size_t(1),std::memory_order_acq_rel);if (prev == 1){ std::unique_lock<std::mutex> guard(tp.in_flight_mutex);tp.in_flight_condition.notify_all();}}};};// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(std::size_t threads): pool_size(threads), in_flight(0){ std::unique_lock<std::mutex> lock(this->queue_mutex);for (std::size_t i = 0; i != threads; ++i)start_worker(i, lock);}// add new work item to the pool// 有两种方法可以实现调用类成员,// 一种是使用   bind: .enqueue(std::bind(&Dog::sayHello, &dog));// 一种是用   mem_fn: .enqueue(std::mem_fn(&Dog::sayHello), this)template<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>{ using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();std::unique_lock<std::mutex> lock(queue_mutex);if (tasks.size() >= max_queue_size)// wait for the queue to empty or be stoppedcondition_producers.wait(lock,[this]{ return tasks.size() < max_queue_size|| stop;});// don't allow enqueueing after stopping the poolif (stop)//若线程池已经开始析构,这是不允许加入新事件throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() {  (*task)(); });std::atomic_fetch_add_explicit(&in_flight,std::size_t(1),std::memory_order_relaxed);condition_consumers.notify_one();return res;}// the destructor joins all threadsinline ThreadPool::~ThreadPool(){ std::unique_lock<std::mutex> lock(queue_mutex);stop = true;pool_size = 0;condition_consumers.notify_all();condition_producers.notify_all();condition_consumers.wait(lock, [this] {  return this->workers.empty(); });assert(in_flight == 0);}inline void ThreadPool::wait_until_empty(){ std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_producers.wait(lock,[this] {  return this->tasks.empty(); });}inline void ThreadPool::wait_until_nothing_in_flight(){ std::unique_lock<std::mutex> lock(this->in_flight_mutex);this->in_flight_condition.wait(lock,[this] {  return this->in_flight == 0; });}inline void ThreadPool::set_queue_size_limit(std::size_t limit){ std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_limit = max_queue_size;max_queue_size = (std::max)(limit, std::size_t(1));if (old_limit < max_queue_size)condition_producers.notify_all();}inline void ThreadPool::set_pool_size(std::size_t limit){ if (limit < 1)limit = 1;std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_size = pool_size;assert(this->workers.size() >= old_size);pool_size = limit;if (pool_size > old_size){ // create new worker threads// it is possible that some of these are still running because// they have not stopped yet after a pool size reduction, such// workers will just keep runningfor (std::size_t i = old_size; i != pool_size; ++i)start_worker(i, lock);}else if (pool_size < old_size)// notify all worker threads to start downsizingthis->condition_consumers.notify_all();}inline void ThreadPool::start_worker(std::size_t worker_number, std::unique_lock<std::mutex> const &lock){ assert(lock.owns_lock() && lock.mutex() == &this->queue_mutex);assert(worker_number <= this->workers.size());auto worker_func =[this, worker_number]{ for (;;){ std::function<void()> task;bool notify;{ std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_consumers.wait(lock,[this, worker_number] { return this->stop || !this->tasks.empty()|| pool_size < worker_number + 1; });// deal with downsizing of thread pool or shutdownif ((this->stop && this->tasks.empty())|| (!this->stop && pool_size < worker_number + 1)){ // detach this worker, effectively marking it stoppedthis->workers[worker_number].detach();// downsize the workers vector as much as possiblewhile (this->workers.size() > pool_size&& !this->workers.back().joinable())this->workers.pop_back();// if this is was last worker, notify the destructorif (this->workers.empty())this->condition_consumers.notify_all();return;}else if 
                

更多相关:

  • nth_element(first,nth,last) first,last 第一个和最后一个迭代器,也可以直接用数组的位置。  nth,要定位的第nn 个元素,能对它进行随机访问. 将第n_thn_th 元素放到它该放的位置上,左边元素都小于它,右边元素都大于它. 测试代码: http://www.cplusplus.com...

  • c/c++老版本的rand()存在一定的问题,在转换rand随机数的范围,类型或者分布时,常常会引入非随机性。 定义在 中的随机数库通过一组协作类来解决这类问题:随机数引擎 和 随机数分布类 一个给定的随机数发生器一直会生成相同的随机数序列。一个函数如果定义了局部的随机数发生器,应该将(引擎和分布对象)定义为 st...

  • jsoncpp 是一个C++ 语言实现的json库,非常方便得支持C++得各种数据类型到json 以及 json到各种数据类型的转化。 一个json 类型的数据如下: {"code" : 10.01,"files" : "","msg" : "","uploadid" : "UP000000" } 这种数据类型方便我们人阅读以...

  • 问题如下: 已知一组数(其中有重复元素),求这组数可以组成的所有子集中,子 集中的各个元素和为整数target的子集,结果中无重复的子集。 例如: nums[] = [10, 1, 2, 7, 6, 1, 5], target = 8 结果为: [[1, 7], [1, 2, 5], [2, 6], [1, 1, 6]] 同样之前有...

  • 1. 定义网络的基本参数 定义输入网络的是什么: input = Input(shape=(240, 640, 3)) 反向传播时梯度下降算法 SGD一定会收敛,但是速度慢 Adam速度快但是可能不收敛 [link](https://blog.csdn.net/wydbyxr/article/details/84822806...

  • size_t和int       size_t是一些C/C++标准在stddef.h中定义的。这个类型足以用来表示对象的大小。size_t的真实类型与操作系统有关。 在32位架构中被普遍定义为: typedef   unsigned int size_t; 而在64位架构中被定义为: typedef  unsigned lo...

  • 我在 https://blog.csdn.net/wowricky/article/details/83218126 介绍了一种内存池,它的实现类似于linux 中打开slub_debug (1. make menuconfig: Kenel hacking -> Memory Debugging, 2. comand line中传入...

  • 项目开发中需要从引擎 获取一定范围的数据大小,用作打点上报,测试过程中竟然发现写入了一部分数据之后通过GetApproximateSizes 获取写入的key的范围时取出来的数据大小竟然为0。。。难道发现了一个bug?(欣喜) 因为写入的数据是小于一个sst的data-block(默认是4K),会不会因为GetApproximate...

  • 开发环境使用phpstudy 编辑器用sublime 数据库navicat 需要下载composer 先配置好本地域名,然后需要我们将资源引入到项目里面 下载地址www.layui.com. layui框架有很多我们后台开发需要的控件,帮助我们高效完成后台搭建。 先创建我们的入口文件admins.php,接着我们在a...

  • php 如果在类中定义变量,在类的方法中调用时应该加上$this-> .     class ClassName {private $a = 333;function __construct(){$this->a = 2222;}public function bbb($value=''){echo $this->a;} } $b...

  • 今天我们来看UrlRule.php

  • 【数据对象映射模式】 是将对象和数据存储映射起来,对一个对象的操作会映射为对数据存储的操作。例如在代码中 new 一个对象,使用数据对象映射模式就可以将对象的一些操作比如设置一些属性,就会自动保存到数据库,跟数据库中表的一条记录对应起来。   【代码实现】 在代码中实现数据对象映射模式,我们将实现一个 ORM(对象关系映射 Objec...