首页 > python threading ThreadPoolExecutor源码解析

python threading ThreadPoolExecutor源码解析

future: 未来对象,或task的返回容器

1. 当submit后:

    def submit(self, fn, *args, **kwargs):with self._shutdown_lock: # lock是线程锁if self._shutdown:raise RuntimeError('cannot schedule new futures after shutdown')f = _base.Future() # 创建future对象w = _WorkItem(f, fn, args, kwargs) # 线程池执行基本单位
self._work_queue.put(w) #实现的是queueself._adjust_thread_count() # 这里会进行判断当前执行线程的数量return f

 

 

2. _adjust_thread_count:

    def _adjust_thread_count(self):# When the executor gets lost, the weakref callback will wake up# the worker threads.def weakref_cb(_, q=self._work_queue):q.put(None)# TODO(bquinlan): Should avoid creating new threads if there are more# idle threads than items in the work queue.num_threads = len(self._threads)if num_threads < self._max_workers:thread_name = '%s_%d' % (self._thread_name_prefix or self,num_threads)t = threading.Thread(name=thread_name, target=_worker,args=(weakref.ref(self, weakref_cb),self._work_queue)) # 创建线程,并调用_worker方法,传入work queuet.daemon = Truet.start()self._threads.add(t)_threads_queues[t] = self._work_queue

 

 

3. _worker:

def _worker(executor_reference, work_queue):try:while True:work_item = work_queue.get(block=True)if work_item is not None:work_item.run()# Delete references to object. See issue16284del work_itemcontinueexecutor = executor_reference()# Exit if:#   - The interpreter is shutting down OR#   - The executor that owns the worker has been collected OR#   - The executor that owns the worker has been shutdown.if _shutdown or executor is None or executor._shutdown:# Notice other workers
                work_queue.put(None)returndel executorexcept BaseException:_base.LOGGER.critical('Exception in worker', exc_info=True)

 

 

4. WorkItem

class _WorkItem(object):def __init__(self, future, fn, args, kwargs):self.future = futureself.fn = fnself.args = argsself.kwargs = kwargsdef run(self):if not self.future.set_running_or_notify_cancel():returntry:result = self.fn(*self.args, **self.kwargs)except BaseException as exc:self.future.set_exception(exc)# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result)

 

转载于:https://www.cnblogs.com/callyblog/p/11147946.html

更多相关:

  • 基于KCF和MobileNet V2以及KalmanFilter的摄像头监测系统 简介 这是一次作业。Tracking这一块落后Detection很多年了,一般认为Detection做好了,那么只要能够做的足够快,就能达到Tracking的效果了,实则不然,现在最快的我认为就是一些可以在手机等arm下使用的轻量神经网络了,但是其牺牲...

  • 最近从 kvell 这篇论文中看到一些单机存储引擎的优秀设计,底层存储硬件性能在不远的未来可能不再是主要的性能瓶颈,反而高并发下的CPU可能是软件性能的主要限制。像BPS/AEP/Optane-SSD 等Intel 推出的硬件存储栈已经能够在延时上接近DRAM的量级,吞吐在较低的队列深度下更是能够超越当前主流NVMe-ssd 数倍甚至...