Python线程池之ThreadPoolExecutor详解


Python线程池之ThreadPoolExecutor详解

1、什么是线程池

线程池的基本思想是一种对象池,在程序启动时就开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。

2、使用线程池的好处

减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

运用线程池能有效的控制线程最大并发数,可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

对线程进行一些简单的管理,比如:延时执行、定时循环执行的策略等,运用线程池都能进行很好的实现

3、线程池的主要组件


Python线程池之ThreadPoolExecutor详解

一个线程池包括以下四个基本组成部分:

1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;

2. 工作线程(WorkThread):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;

3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;

4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

4. Python多线程(进程)管理模块之ThreadPoolExecutor,ProcessPoolExecutor

在concurrent.future模块中有ThreadPoolExecutor和ProcessPoolExecutor两个类,这两个类内部维护着线程/进程池,以及要执行的任务队列,使得操作变得非常简单,不需要关心任何实现细节

来看一个简单的例子

```python

#!/usr/bin/env python3.6

from concurrent.futures import ThreadPoolExecutor

import requests

import os

DEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "download")

BASE_URL = "http://flupy.org/data/flags"

CC_LIST = ("CN", "US", "JP", "EG")

if not os.path.exists(DEST_DIR):

os.mkdir(DEST_DIR)

def get_img(cc):

url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())

response = requests.get(url)

return response.content

def save_img(img, filename):

path = os.path.join(DEST_DIR, filename)

with open(path, 'wb') as f:

f.write(img)

def download_one(cc):

img = get_img(cc)

save_img(img, cc.lower() + ".gif")

return cc

def download_many(cc_list):

works = len(cc_list)

with ThreadPoolExecutor(works) as exector: # 使用with来管理ThreadPoolExecutor

# map方法和内置的map方法类似,不过exector的map方法会并发调用,返回一个由返回的值构成的生成器

response = exector.map(download_one, cc_list)

return len(list(response))

if __name__ == "__main__":

download_many(CC_LIST)

```

Future

concurrent.futures和asyncio中的Future类的作用相同,****都表示可能己经完成或尚未完成的延迟计算****

Future封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果后可以获取结果

使用exector.submit()方法提交执行的函数并获取一个Future,而不是直接创建,传入的参数是一个可调用的对象;获取的Future对象有一个done()方法,判断该Future是否己完成, add_one_callback()设置回调函数, result()来获取Future的结果。as_completed()传一个Future列表,在Future都完成之后返回一个迭代器

使用submit()方法试试看

```python

def download_many(cc_list):

with ThreadPoolExecutor(max_workers=5) as exector:

future_list = []

for cc in cc_list:

# 使用submit提交执行的函数到线程池中,并返回futer对象(非阻塞)

future = exector.submit(download_one, cc)

future_list.append(future)

print(cc, future)

result = []

# as_completed方法传入一个Future迭代器,然后在Future对象运行结束之后yield Future

for future in futures.as_completed(future_list):

# 通过result()方法获取结果

res = future.result()

print(res, future)

result.append(res)

return len(result)


>>>

CN <future>

US <future>

JP <future>

EG <future>

JP <future>

CN <future>

EG <future>

US <future>

```

ProcessPoolExecutor的使用方法是一样的,唯一需要注意的区别是传入的max_workers这个参数对于ProcessPoolExecutor是可选的,在不使用的情况下默认值是os.cpu_count()的返回值(cpu的数量)

exector.submit()和futures.as_completed()这个组合比exector.map()更灵活,submit()可以处理不同的调用函数和参数,而map只能处理同一个可调用对象。

wait()阻塞主线程,直到所有task都完成。

源码分析:

```python

class ThreadPoolExecutor(_base.Executor):

# Used to assign unique thread names when thread_name_prefix is not supplied.

_counter = itertools.count().__next__

def __init__(self, max_workers=None, thread_name_prefix='',

initializer=None, initargs=()):

"""Initializes a new ThreadPoolExecutor instance.

Args:

max_workers: The maximum number of threads that can be used to

execute the given calls.

thread_name_prefix: An optional name prefix to give our threads.

initializer: An callable used to initialize worker threads.

initargs: A tuple of arguments to pass to the initializer.

"""

if max_workers is None:

# Use this number because ThreadPoolExecutor is often

# used to overlap I/O instead of CPU work.

max_workers = (os.cpu_count() or 1) * 5

if max_workers <= 0:

raise ValueError("max_workers must be greater than 0")

if initializer is not None and not callable(initializer):

raise TypeError("initializer must be a callable")

self._max_workers = max_workers

self._work_queue = queue.SimpleQueue()

self._threads = set()

self._broken = False

self._shutdown = False

self._shutdown_lock = threading.Lock()

self._thread_name_prefix = (thread_name_prefix or

("ThreadPoolExecutor-%d" % self._counter()))

self._initializer = initializer

self._initargs = initargs

def submit(self, fn, *args, **kwargs):

with self._shutdown_lock:

if self._broken:

raise BrokenThreadPool(self._broken)

if self._shutdown:

raise RuntimeError('cannot schedule new futures after shutdown')

if _shutdown:

raise RuntimeError('cannot schedule new futures after '

'interpreter shutdown')

f = _base.Future()

w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)

self._adjust_thread_count()

return f

submit.__doc__ = _base.Executor.submit.__doc__

def _adjust_thread_count(self):

# When the executor gets lost, the weakref callback will wake up

# the worker threads.

def weakref_cb(_, q=self._work_queue):

q.put(None)

# TODO(bquinlan): Should avoid creating new threads if there are more

# idle threads than items in the work queue.

num_threads = len(self._threads)

if num_threads < self._max_workers:

thread_name = '%s_%d' % (self._thread_name_prefix or self,

num_threads)

t = threading.Thread(name=thread_name, target=_worker,

args=(weakref.ref(self, weakref_cb),

self._work_queue,

self._initializer,

self._initargs))

t.daemon = True

t.start()

self._threads.add(t)

_threads_queues[t] = self._work_queue

def _initializer_failed(self):

with self._shutdown_lock:

self._broken = ('A thread initializer failed, the thread pool '

'is not usable anymore')

# Drain work queue and mark pending futures failed

while True:

try:

work_item = self._work_queue.get_nowait()

except queue.Empty:

break

if work_item is not None:

work_item.future.set_exception(BrokenThreadPool(self._broken))

def shutdown(self, wait=True):

with self._shutdown_lock:

self._shutdown = True

self._work_queue.put(None)

if wait:

for t in self._threads:

t.join()

shutdown.__doc__ = _base.Executor.shutdown.__doc__

```

这是ThreadPoolExecutor的全部代码,还是比较简单的,我们上面提到的线程池的具备的四个基本功能来分析:线程池管理器,工作线程,任务队列,任务接口来分析

线程池管理器: _threads_queues

工作线程:_worker

任务队列:_work_queue

任务接口:submit

submit(func) 干了两件事:

把任务(func)放入queue中

开启一个新线程不断从queue中取出任务,执行woker.run(),即执行func()

_adjust_thread_count()干了两件事:

开启一个新线程执行_worker函数,这个函数的作用就是不断去queue中取出worker, 执行woker.run(),即执行func()

把新线程跟队列queue绑定,防止线程被join(0)强制中断。

来看一下_worker函数源码:

```python

def _worker(executor_reference, work_queue):

try:

while True:

# 不断从queue中取出worker对象

work_item = work_queue.get(block=True)

if work_item is not None:

# 执行func()

work_item.run()

# Delete references to object. See issue16284

del work_item

continue

# 从弱引用对象中返回executor

executor = executor_reference()

# Exit if:

# - The interpreter is shutting down OR

# - The executor that owns the worker has been collected OR

# - The executor that owns the worker has been shutdown.

# 当executor执行shutdown()方法时executor._shutdown为True,同时会放入None到队列,

# 当work_item.run()执行完毕时,又会进入到下一轮循环从queue中获取worker对象,但是

# 由于shutdown()放入了None到queue,因此取出的对象是None,从而判断这里的if条件分支,

# 发现executor._shutdown是True,又放入一个None到queue中,是来通知其他线程跳出while循环的

# shutdown()中的添加None到队列是用来结束线程池中的某一个线程的,这个if分支中的添加None

# 队列是用来通知其他线程中的某一个线程结束的,这样连锁反应使得所有线程执行完func中的逻辑后都会结束

if _shutdown or executor is None or executor._shutdown:

# Notice other workers

work_queue.put(None)

return

del executor

except BaseException:

_base.LOGGER.critical('Exception in worker', exc_info=True)

```

可以看出,这个 _worker方法的作用就是在新新线程中不断获得queue中的worker对象,执行worker.run()方法,执行完毕后通过放入None到queue队列的方式来通知其他线程结束。

再来看看_adjust_thread_count()方法中的_threads_queues[t] = self._work_queue这个操作是如何实现防止join(0)的操作强制停止正在执行的线程的。

需要注意的是,ThreadPoolExecutor创建的线程daemon=True,而ProcessPoolExecutor创建的进程daemon使用默认值。


分享到:


相關文章: