Python并发编程之concurrent.futures

一次请求超时问题所引申出的笔记

最近项目中遇到了一个问题,一个模块中的任务是计算密集型的,服务器的响应要十几到几十分钟不等(VPS是单核1G的学生机,没办法呀´・_・`),用户过来的请求自然会出现连接超时的问题。

初步想到的解决办法是把这个任务挂到后台去异步处理,然后先返回给用户一个任务已提交的提示就OK了。一开始用的是线程池 ThreadPoolExecutor,后来了解到 GIL 全局解释器锁这个神奇的存在,对于这种计算密集型任务必然会引起十分频繁的上下文切换,大大降低效率,所以最后的解决办法又换成了进程池 ProcessPoolExecutor

本文使用的是 Python3 中自带的 concurrent.futures 并发模块。

Executor

Executor 是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor 或者ProcessPoolExecutor 进行调用。ThreadPoolExecutorProcessPoolExecutor 都继承了 Lib/concurrent/futures/_base.py 中的 Executor 类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Returns:
A Future representing the given call.
"""
raise NotImplementedError()

def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).

Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.

Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()

fs = [self.submit(fn, *args) for args in zip(*iterables)]

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()

ProcessPoolExecutor

流程

先来看看整个流程吧,作者的源码注释很贴心,还有图解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""Implements ProcessPoolExecutor.

The follow diagram and text describe the data-flow through the system:

|======================= In-process =====================|== Out-of-process ==|

+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+

Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue

Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
WorkItem from the "Work Items" dict: if the work item has been cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
"Work Items" dict and deletes the dict entry

Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Result Q"
"""

  1. 调用 Executor.submit()后会创建一个唯一ID的 _WorkItem 对象,然后加入到 Work Items 字典中,然后将 _WorkItem 的唯一ID加入至 Work Ids 队列中。

  2. Local worker thread 会先从 Work Ids 队列读取 work ids 并以此来查找对应的 Work Items 字典。如果 work item 被取消了,就把它从字典中删除,没被取消的话就重新打包成一个 _CallItem 对象放入新队列 Call Q 中。

  3. ProcessPool 中的进程会从 Call Q 队列中取 _CallItem 执行,并把结果封装成 _ResultItems 放入 Result Q 队列中。

  4. Local worker thread 接着从 Result Q 队列读取 _ResultItems,然后更新 _WorkItem 中的 Future 并删除 dict entry

  5. 可以看出,整个进程池的任务处理与结果回调都要依靠一个十分重要的中介 Local worker thread 来完成。

方法

接着来看看具体方法:

1
Executor = ProcessPoolExecutor(max_workers=None)

开启一个容量为 max_workers 进程池,如果 max_workersNone 则默认使用CPU的核数

1
Executor.submit(fn, *args, **kwargs)

其中:fn:需要执行的方法;*args, **kwargsfn 参数
该方法会返回一个 future 对象。

1
Executor.map(fn, *iterables, timeout=None, chunksize=1)

其中:fn:需要执行的方法;*iterables:可迭代对象,如列表等,每一次fn执行,都会从iterables中取参数;timeout:超时值(int or float),如果操作超时,会返回raisesTimeoutError,如果不指定timeout参数,则默认没有超时时间;chunksize:如果大于1,则将可迭代对象切分成chunksize块提交至进程池中,默认为1,对于很大的 iterables ,设置较大 chunksize 可以提高性能。
该方法会返回一个类似 map(func, *iterables) 的迭代器。

future对象

future 对象表示未来已经完成或未完成的异步操作,以下是比较重要的方法:

  • future.done():返回一个布尔值,表示任务是否执行完毕
  • future.add_done_callback():这个方法只有一个参数,类型是可调用对象,任务结束后会回调这个对象。
  • future.result():如果任务结束后调用result(), 会返回方法执行结果,如果任务没有结束时调用该方法,这时会阻塞调用方所在的线程,直到有结果返回。此时该方法还可以接收 timeout 参数,如果在指定的 timeout 时间内任务没有执行完毕,会抛出 TimeoutError 异常。
  • future.cancel():取消提交的任务,如果任务处于阻塞中,可以被取消,但如果任务已经在线程/进程池中运行了,就无法取消。
  • future.result():获取任务的返回值

ThreadPoolExecutor

ThreadPoolExecutor 在使用上和 ProcessPoolExecutor 大致是一样的,方法也是相同的,但是对于 map() 方法 ThreadPoolExecutor 会少一个参数 chunksize

GIL

GIL (Global Interpreter Lock)即全局解释器锁,由于 CPython 解释器的内存管理不是线程安全的,因此解释器被一个 GIL 保护着,它确保任何时候都只有一个 Python 线程执行。首先需要明确的一点是 GIL 并不是 Python 的特性,它是在实现 Python 解析器(CPython)时所引入的一个概念,像 JPython 、IronPython 等就没有 GIL 。

官方对GIL的定义如下:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

根据官方定义可以看出,GIL 本质上来说就是一把全局排他锁。

在 Python2 中,GIL 的释放逻辑是当前线程遇见 IO 操作或者 ticks 计数达到100进行释放。ticks 可以看作是 Python 自身的一个计数器,专门做用于 GIL,每次释放后归零,这个计数可以通过 sys.setcheckinterval 来调整。

在Python3 中,GIL 不使用 ticks 计数,改为使用计时器,即执行时间达到阈值后,当前线程释放 GIL,这样对计算密集型程序更加友好,但依然没有解决 GIL 导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。

虽然由于 GIL 的缘故,Python 中的多线程显得有些鸡肋,但在 IO 密集型任务下还是有用的。

  • 本文作者: Marticles
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议。转载请注明出处!