多线程¶
创建线程¶
继承 Thread¶
from threading import Thread
class multithreading_task(Thread): # 继承 Thread
def run(self):
# 重写 run 方法
创建 Thread 对象¶
from threading import Thread
p=Thread(target=函数名,args=(参数元组)) # 创建 Thread 对象
方法 | 说明 |
---|---|
getName() |
获取线程名称 |
setName() |
设置线程名称 |
ident() |
获取线程标识符 |
currentThread() |
获得当前线程 |
Join & Daemon¶
假设主线程 A
创建了子线程 B
-
在
B
的start
方法调用前 使用B.setDaemon(True)
将把主线程A
设置为B
的 守护线程作用:不管子线程
B
有没有执行完,只要主线程A
执行完了,A
和B
一起退出def calc(): global x for _ in range(20000000): x+=1 x=0 B=Thread(target=calc) B.setDaemon(True) B.start() print(x)
43859
-
在
B
的start
方法调用后 使用B.join([timeout])
将 阻塞主进程A
作用:主线程
A
将等待子线程B
执行完后才退出,缺省参数timeout
代表线程运行的最大时间,即如果超过这个时间,不管这个此线程有没有执行完毕都会被回收比如:要在子线程
t
中执行大量计算,主线程中调用计算结果,则需要让主线程等待子线程完成计算(阻塞),可用t.join()
实现这一要求def calc(): global x for _ in range(20000000): x+=1 x=0 B=Thread(target=calc) B.start() B.join() print(x)
20000000
线程同步¶
互斥锁¶
互斥量(mutual exclusive),简称 Mutex,最常见的多线程同步方式,更多时候我们称其为 互斥锁
其思想简单粗暴,多线程共享一个互斥量,然后线程之间去竞争。得到锁的线程可以进入临界区执行代码。
普通互斥锁¶
# 无锁
def calc():
global x
for _ in range(20000000):
x+=1
x=0
B1=Thread(target=calc)
B2=Thread(target=calc)
B1.start()
B2.start()
B1.join()
B2.join()
print(x)
26901726
# 有锁
def calc():
global x
for _ in range(20000000):
mutex.acquire()
x+=1
mutex.release()
x=0
mutex=Lock()
B1=Thread(target=calc)
B2=Thread(target=calc)
B1.start()
B2.start()
B1.join()
B2.join()
print(x)
40000000
可重入互斥锁¶
可重入互斥锁(Reentrant Mutex),相比普通互斥锁,RLock
可以多次 acquire
,常用于递归中
注意:acquir
了多少次,对应也应该 release
多少次
Event¶
方法 | 说明 |
---|---|
set() |
将 Event 对象内部的信号设置为 True |
clear() |
将 Event 对象内部的信号设置为 False |
wait() |
如果标志为 True 将立即返回,否则阻塞直至表示变为 True |
isSet() |
获取 Event 对象内部的信号 |
event=Event()
event.clear()
def activity():
print(f'{currentThread().getName()} Prepared')
event.wait()
print(f'{currentThread().getName()} Start')
t1=Thread(target=activity)
t2=Thread(target=activity)
t1.start()
t2.start()
event.set()
Thread-1 Prepared
Thread-2 Prepared
Thread-2 Start
Thread-1 Start
Condition¶
可以认为 Condition 维护了一个锁(Lock or RLock)和一个 waiting 池(存储所有挂起的线程)
方法 | 说明 |
---|---|
acquire() |
获得内部锁 |
release() |
释放内部锁 |
wait() |
释放内部锁,并挂起 |
notify() |
任选一个挂起的线程,将其唤醒,并通知其尝试获取锁 |
notify_all() |
对所有挂起的线程调用 notify() 方法 |
注意:
notify()
前必须先acquire()
,最后必须release()
才能让所有处于 waiting 状态的线程释放
下面给出基于 Condition 实现的 “生产者-消费者” 模型:
from threading import Thread,Condition,currentThread
from time import sleep
from random import randint
products=[]
con=Condition()
class Producer(Thread):
def run(self):
while True:
con.acquire()
if len(products)>0:
con.wait()
else:
print('Start Produce...')
sleep(randint(0,4))
print('Produce Finished')
products.append(randint(20,30))
con.notify()
con.wait()
con.release()
class Consumer(Thread):
def run(self):
while True:
con.acquire()
if len(products)>0:
print('Start Consume')
sleep(randint(0,4))
print('Consume Finished')
products.pop()
else:
print('Oops, no products')
con.notify()
con.wait()
con.release()
_producer=Producer()
_consumer=Consumer()
_consumer.start()
_producer.start()
_producer.join()
_consumer.join()
Queue¶
方法 | 说明 |
---|---|
get() |
删除并返回队头元素 |
put() |
从队尾插入元素,join 计数器加一 |
task_done() |
join 计数器减一(与 get() 配对使用) |
join() |
计数器,不为 0 则阻塞 |
下面给出基于 Queue 实现的另一种 “生产者-消费者” 模型:
from threading import Thread
import Queue
from time import sleep
from random import randint
q=Queue.Queue(5)
class Producer(Thread):
def run(self):
while True:
q.put('products')
print('make a product')
sleep(3)
class Consumer(Thread):
def run(self):
while True:
q.get()
print('consume a product')
sleep(5)
p=Producer()
c=Consumer()
c.start()
p.start()
c.join()
p.join()
线程池¶
方法 | 说明 |
---|---|
submit() |
尝试将任务放入线程池,若已满则排队 |
done() |
判断任务是否完成 |
cancel() |
若任务还在排队,则取消该任务 |
result() |
获得任务的返回值,如果还在排队或正在执行则阻塞 |
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def download_simulate(sleep_time):
sleep(sleep_time)
return sleep_time
pool=ThreadPoolExecutor(max_workers=2) # 同时最多开 2 个线程用于下载
tasks=[]
for _ in range(1,11):
tasks.append(pool.submit(download_simulate,(1)))
sleep(1)
print(f'Task 1 is finished? {tasks[0].done()}')
print(f'Can task 1 be cancelled? {tasks[0].cancel()}')
print(f'Task 3 is finished? {tasks[2].done()}')
print(f'Can task 3 be cancelled? {tasks[2].cancel()}')
print(f'Task 5 is finished? {tasks[4].done()}')
print(f'Can task 5 be cancelled? {tasks[4].cancel()}')
for sq in range(0,10):
if sq==4:
continue # task 5 被取消了,不能调用 result() 方法
print(f'Task {sq+1} slept {tasks[sq].result()}s')
Task 1 is finished? True
Can task 1 be cancelled? False
Task 3 is finished? False
Can task 3 be cancelled? False
Task 5 is finished? False
Can task 5 be cancelled? True
Task 1 slept 1s
Task 2 slept 1s
Task 3 slept 1s
Task 4 slept 1s
Task 6 slept 1s
Task 7 slept 1s
Task 8 slept 1s
Task 9 slept 1s
Task 10 slept 1s