future: 未来对象,或task的返回容器
1. 当submit后:
def submit(self, fn, *args, **kwargs):with self._shutdown_lock: # lock是线程锁if self._shutdown:raise RuntimeError('cannot schedule new futures after shutdown')f = _base.Future() # 创建future对象w = _WorkItem(f, fn, args, kwargs) # 线程池执行基本单位 self._work_queue.put(w) #实现的是queueself._adjust_thread_count() # 这里会进行判断当前执行线程的数量return f
2. _adjust_thread_count:
def _adjust_thread_count(self):# When the executor gets lost, the weakref callback will wake up# the worker threads.def weakref_cb(_, q=self._work_queue):q.put(None)# TODO(bquinlan): Should avoid creating new threads if there are more# idle threads than items in the work queue.num_threads = len(self._threads)if num_threads < self._max_workers:thread_name = '%s_%d' % (self._thread_name_prefix or self,num_threads)t = threading.Thread(name=thread_name, target=_worker,args=(weakref.ref(self, weakref_cb),self._work_queue)) # 创建线程,并调用_worker方法,传入work queuet.daemon = Truet.start()self._threads.add(t)_threads_queues[t] = self._work_queue
3. _worker:
def _worker(executor_reference, work_queue):try:while True:work_item = work_queue.get(block=True)if work_item is not None:work_item.run()# Delete references to object. See issue16284del work_itemcontinueexecutor = executor_reference()# Exit if:# - The interpreter is shutting down OR# - The executor that owns the worker has been collected OR# - The executor that owns the worker has been shutdown.if _shutdown or executor is None or executor._shutdown:# Notice other workers work_queue.put(None)returndel executorexcept BaseException:_base.LOGGER.critical('Exception in worker', exc_info=True)
4. WorkItem
class _WorkItem(object):def __init__(self, future, fn, args, kwargs):self.future = futureself.fn = fnself.args = argsself.kwargs = kwargsdef run(self):if not self.future.set_running_or_notify_cancel():returntry:result = self.fn(*self.args, **self.kwargs)except BaseException as exc:self.future.set_exception(exc)# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result)