一次请求超时问题所引申出的笔记
最近项目中遇到了一个问题,一个模块中的任务是计算密集型的,服务器的响应要十几到几十分钟不等(VPS是单核1G的学生机,没办法呀´・_・`),用户过来的请求自然会出现连接超时的问题。
初步想到的解决办法是把这个任务挂到后台去异步处理,然后先返回给用户一个任务已提交的提示就OK了。一开始用的是线程池 ThreadPoolExecutor
,后来了解到 GIL
全局解释器锁这个神奇的存在,对于这种计算密集型任务必然会引起十分频繁的上下文切换,大大降低效率,所以最后的解决办法又换成了进程池 ProcessPoolExecutor
。
本文使用的是 Python3 中自带的 concurrent.futures
并发模块。
Executor
Executor
是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor
或者ProcessPoolExecutor
进行调用。ThreadPoolExecutor
和 ProcessPoolExecutor
都继承了 Lib/concurrent/futures/_base.py
中的 Executor
类。
1 | class Executor(object): |
ProcessPoolExecutor
流程
先来看看整个流程吧,作者的源码注释很贴心,还有图解。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41"""Implements ProcessPoolExecutor.
The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
WorkItem from the "Work Items" dict: if the work item has been cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
"Work Items" dict and deletes the dict entry
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Result Q"
"""
调用
Executor.submit()
后会创建一个唯一ID的_WorkItem
对象,然后加入到Work Items
字典中,然后将_WorkItem
的唯一ID加入至Work Ids
队列中。Local worker thread
会先从Work Ids
队列读取 work ids 并以此来查找对应的Work Items
字典。如果 work item 被取消了,就把它从字典中删除,没被取消的话就重新打包成一个_CallItem
对象放入新队列Call Q
中。ProcessPool 中的进程会从
Call Q
队列中取_CallItem
执行,并把结果封装成_ResultItems
放入Result Q
队列中。Local worker thread
接着从Result Q
队列读取_ResultItems
,然后更新_WorkItem
中的Future
并删除dict entry
。可以看出,整个进程池的任务处理与结果回调都要依靠一个十分重要的中介
Local worker thread
来完成。
方法
接着来看看具体方法:
1 | Executor = ProcessPoolExecutor(max_workers=None) |
开启一个容量为 max_workers
进程池,如果 max_workers
为 None
则默认使用CPU的核数
1 | Executor.submit(fn, *args, **kwargs) |
其中:fn
:需要执行的方法;*args
, **kwargs
:fn
参数
该方法会返回一个 future
对象。
1 | Executor.map(fn, *iterables, timeout=None, chunksize=1) |
其中:fn
:需要执行的方法;*iterables
:可迭代对象,如列表等,每一次fn
执行,都会从iterables
中取参数;timeout
:超时值(int or float),如果操作超时,会返回raisesTimeoutError
,如果不指定timeout
参数,则默认没有超时时间;chunksize
:如果大于1,则将可迭代对象切分成chunksize
块提交至进程池中,默认为1,对于很大的 iterables
,设置较大 chunksize
可以提高性能。
该方法会返回一个类似 map(func, *iterables)
的迭代器。
future对象
future
对象表示未来已经完成或未完成的异步操作,以下是比较重要的方法:
future.done()
:返回一个布尔值,表示任务是否执行完毕future.add_done_callback()
:这个方法只有一个参数,类型是可调用对象,任务结束后会回调这个对象。future.result()
:如果任务结束后调用result(), 会返回方法执行结果,如果任务没有结束时调用该方法,这时会阻塞调用方所在的线程,直到有结果返回。此时该方法还可以接收timeout
参数,如果在指定的timeout
时间内任务没有执行完毕,会抛出TimeoutError
异常。future.cancel()
:取消提交的任务,如果任务处于阻塞中,可以被取消,但如果任务已经在线程/进程池中运行了,就无法取消。future.result()
:获取任务的返回值
ThreadPoolExecutor
ThreadPoolExecutor
在使用上和 ProcessPoolExecutor
大致是一样的,方法也是相同的,但是对于 map()
方法 ThreadPoolExecutor
会少一个参数 chunksize
。
GIL
GIL (Global Interpreter Lock)
即全局解释器锁,由于 CPython 解释器的内存管理不是线程安全的,因此解释器被一个 GIL 保护着,它确保任何时候都只有一个 Python 线程执行。首先需要明确的一点是 GIL 并不是 Python 的特性,它是在实现 Python 解析器(CPython)时所引入的一个概念,像 JPython 、IronPython 等就没有 GIL 。
官方对GIL的定义如下:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
根据官方定义可以看出,GIL 本质上来说就是一把全局排他锁。
在 Python2 中,GIL 的释放逻辑是当前线程遇见 IO 操作或者 ticks 计数达到100进行释放。ticks 可以看作是 Python 自身的一个计数器,专门做用于 GIL,每次释放后归零,这个计数可以通过 sys.setcheckinterval 来调整。
在Python3 中,GIL 不使用 ticks 计数,改为使用计时器,即执行时间达到阈值后,当前线程释放 GIL,这样对计算密集型程序更加友好,但依然没有解决 GIL 导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。
虽然由于 GIL 的缘故,Python 中的多线程显得有些鸡肋,但在 IO 密集型任务下还是有用的。