python并发编程之多线程:thread、ThreadPoolExecutor


周俊贤:Python并发、并行编程概念篇:并行VS并发、同步VS异步
周俊贤:python并发编程之多线程:thread、ThreadPoolExecutor
周俊贤:Python并行编程:subprocess、ProcessPoolExecutor
博文的大部分资料和代码是参考自附录参考资料里面的材料,外加个人理解。
Python 的线程是不是假的线程?——不得不谈的GIL
例子:输入一个列表,对于列表中的每个元素,我想计算 0 到这个元素的所有整数的平方和。
import time
def cpu_bound(number):
print(sum(i * i for i in range(number)))
def calculate_sums(numbers):
for number in numbers:
cpu_bound(number)
def main():
start_time = time.perf_counter()
numbers = [10000000 + x for x in range(4)]
calculate_sums(numbers)
end_time = time.perf_counter()
print(f'Total time is {end_time-start_time:.4f} seconds')
if __name__ == '__main__':
main()
333333283333335000000
333333383333335000000
333333483333355000001
333333583333395000005
Total time is 3.8134 seconds
这时,我想用多线程来加速,这里采用了实例化Thread类来实现多线程,这里共生成了4个线程
from threading import Thread
import time
def cpu_bound(number):
print(sum(i * i for i in range(number)))
def calculate_sums(numbers):
threads = []
for number in numbers:
thread = Thread(cpu_bound(number)) # 实例化
thread.start() # 启动线程
threads.append(thread)
for thread in threads:
thread.join() # 等待线程完成,并关闭线程
def main():
start_time = time.perf_counter()
numbers = [10000000 + x for x in range(4)]
calculate_sums(numbers)
end_time = time.perf_counter()
print(f'Total time is {end_time-start_time:.4f} seconds')
if __name__ == '__main__':
main()
333333283333335000000
333333383333335000000
333333483333355000001
333333583333395000005
Total time is 3.8047 seconds
咦,发现了,我们的程序在多线程下并没有实现加速。
原因在于,在C++与Java这样的语言中,如果程序能由多个线程分头执行任务,那么就可以把CPU的各个核心充分利用起来。而Python中, 假如使用的是CPython解释器,其实就跟真正的的“多线程”无缘了 。
Python语言的标准实现叫做CPython,它分两步来运行Python程序。首先解析源代码文件,并将其编译成字节码(bytecode)。然后,CPython采用基于栈的解释器来运行字节码。这种字节码解释器在执行Python程序的过程中,必须确保相关的状态不受干扰,所以CPython会用一种叫做全局解释器锁(global interpreter lock, GIL)的机制来保证这一点。当解释器在运行线程A的字节码时,会先锁住自己的线程,阻止别的线程执行。
为什么CPython需要GIL呢?
GIL实际上就是一个互斥锁(mutual-eclusion lock, mutex),用来防止CPython的状态在抢占式的多线程之中收到干扰,因为在这种环境下,一条线程有可能突然打断另一条线程抢占程序的控制权。如果这种抢占行为来得不是时候,那么解释器的状态(例如为垃圾回收工作而设立的引用计数等,举个例子,有两个 Python 线程同时引用了 a,就会造成引用计数的 race condition,引用计数可能最终只增加 1,这样就会造成内存被污染。因为第一个线程结束时,会把引用计数减少 1,这时可能达到条件释放内存,当第二个线程再试图访问 a 时,就找不到有效的内存了)就会操作破坏。所以,CPython要通过GIL组织这样的动作,以确保它自身以及它的那些C拓展模块能够正确执行每一条字节码指令。
简单总结:
- 一是设计者为了规避类似于内存管理这样的复杂的竞争风险问题(race condition)
- 二是因为 CPython 大量使用 C 语言库,但大部分 C 语言库都不是原生线程安全的(线程安全会降低性能和增加复杂度)
GIL真的安全吗?
既然GIL让Python线程没办法平行地运行在多个核心上,那是不是意味着它同时还会自动保护程序里面的数据结果,让我们不需要再加锁了?看看下面的例子
import threading
n = 0
def foo():
global n
n += 1
threads = []
for i in range(1000000):
t = threading.Thread(target=foo)
t.start()
threads.append(t)
for t in threads:
t.join()
print(n)
999998
问题出在n+1这一行,n+=1实际上分成了三个步骤
- value = get(n)的值
- result = value + 1
- set(n) = result
当线程A刚读了value,但马上切换到线程B,而线程B完成了全套的赋值操作,再切回线程A时,看到的已经是一个过时的value值了, 所以多个线程同时访问同一个对象是很危险的 。
可能有人问, 为什么Python的多线程是什么时候切换的?能不能保证原子操作?
实际上,CPython有一个机制,叫做 check_interval,意思是 CPython 解释器会去轮询检查线程 GIL 的锁住情况。每隔一段时间,Python 解释器就会强制当前线程去释放 GIL,这样别的线程才能有执行的机会。不同版本的 Python 中,check interval 的实现方式并不一样。早期的 Python 是 100 个 ticks,大致对应了 1000 个 bytecodes;而 Python 3 以后,interval 是 15 毫秒。当然,我们不必细究具体多久会强制释放 GIL,这不应该成为我们程序设计的依赖条件,我们只需明白,CPython 解释器会在一个“合理”的时间范围内释放 GIL 就可以了。
说白了: GIL 的设计,主要是为了方便 CPython 解释器层面的编写者,而不是 Python 应用层面的程序员。 为了避免数据争用,Python在内置的thrading模块里提供了一套健壮的工具 , 其中最简单也是最有效的叫做Lock的类,它相当于互斥锁(mutex)
import threading
from threading import Lock
n = 0
def foo():
global n
with Lock():
n += 1
threads = []
for i in range(1000000):
t = threading.Thread(target=foo)
t.start()
threads.append(t)
for t in threads:
t.join()
print(n)
1000000
为什么还需要多线程?
- 这种机制让我们很容易就能实现出一种效果,也就是令人感觉程序似然能在同一时间做很多事。这样的效果采用手工方式很难编写,而通过线程来实现,则可以让Python自动替我们把这些问题处理好,让多项任务能够并发地执行。由于GIL机制,虽然每次还是只能有一个线程向前执行,但CPython会确保这些Python线程之间能够公平地轮换执行
- 可以通过Python的多线程机制处理阻塞式的I/O任务,因为线程在执行某些系统调用的过程中会发生阻塞,假如只支持一条线程,那么整个程序就会卡在这里不动。Python程序需要通过系统调用与外部环境交互,其中有一些调用属于阻塞式的I/O操作,例如读取文件、写入文件、联网以及与显示器等设备交互。多线程机制可以让程序中的其他线程继续执行各自的工作,只有发起调用请求的那条线程才需要卡在那里等待操作系统给出结果。因为Python在即将执行系统调用时,会释放GIL,待完成调用之后,才会重新获取它。
使用姿势
用一个爬虫的程序作为例子
import concurrent.futures
import requests
import threading
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
for site in sites:
download_one(site)
def main():
sites = [
'https://www.baidu.com/',
'https://www.sina.com.cn/',
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
if __name__ == '__main__':
main()
Thread
import concurrent.futures
import requests
import threading
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
threads = []
for site in sites:
thread = threading.Thread(download_one(site))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
def main():
sites = [
'https://www.baidu.com/',
'https://www.sina.com.cn/',
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
if __name__ == '__main__':
main()
ThreadPoolExecutor
- map的方式
import concurrent.futures
import requests
import threading
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_one, sites)
def main():
sites = [
'https://www.baidu.com/',
'https://www.sina.com.cn/',
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
if __name__ == '__main__':
main()
- submit的方式,如果调用者通过submit方法把某些任务提交给它执行,那么会获得一个与该任务相对应的Future实例,当调用者在这个实例上通过result方法获取执行结果时,ThreadPoolExecutor会把它在执行任务的过程中所遇到的异常自动抛给调用者
import concurrent.futures
import requests
import threading
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for site in sites:
future = executor.submit(download_one, site)
futures.append(future)
for future in futures:
future.result()
# executor.map(download_one, sites)
def main():
sites = [
'https://www.baidu.com/',
'https://www.sina.com.cn/',
start_time = time.perf_counter()