跳转至

多线程

创建线程

继承 Thread

from threading import Thread
class multithreading_task(Thread): # 继承 Thread
    def run(self):
        # 重写 run 方法

创建 Thread 对象

from threading import Thread
p=Thread(target=函数名,args=(参数元组)) # 创建 Thread 对象

方法 说明
getName() 获取线程名称
setName() 设置线程名称
ident() 获取线程标识符
currentThread() 获得当前线程

Join & Daemon

假设主线程 A 创建了子线程 B

  1. Bstart 方法调用前 使用 B.setDaemon(True) 将把主线程 A 设置为 B守护线程

    作用:不管子线程 B 有没有执行完,只要主线程 A 执行完了,AB 一起退出

    def calc():
        global x
        for _ in range(20000000):
            x+=1
    
    x=0
    B=Thread(target=calc)
    B.setDaemon(True)
    B.start()
    print(x)
    
    43859
    

  2. Bstart 方法调用后 使用 B.join([timeout])阻塞主进程 A

    作用:主线程 A 将等待子线程 B 执行完后才退出,缺省参数 timeout 代表线程运行的最大时间,即如果超过这个时间,不管这个此线程有没有执行完毕都会被回收

    比如:要在子线程 t 中执行大量计算,主线程中调用计算结果,则需要让主线程等待子线程完成计算(阻塞),可用 t.join() 实现这一要求

    def calc():
        global x
        for _ in range(20000000):
            x+=1
    
    x=0
    B=Thread(target=calc)
    B.start()
    B.join()
    print(x)
    
    20000000
    

线程同步

互斥锁

v33pcy

互斥量(mutual exclusive),简称 Mutex,最常见的多线程同步方式,更多时候我们称其为 互斥锁

其思想简单粗暴,多线程共享一个互斥量,然后线程之间去竞争。得到锁的线程可以进入临界区执行代码。

普通互斥锁

# 无锁
def calc():
    global x
    for _ in range(20000000):
        x+=1

x=0
B1=Thread(target=calc)
B2=Thread(target=calc)
B1.start()
B2.start()
B1.join()
B2.join()
print(x)
26901726
# 有锁
def calc():
    global x
    for _ in range(20000000):
        mutex.acquire()
        x+=1
        mutex.release()

x=0
mutex=Lock()
B1=Thread(target=calc)
B2=Thread(target=calc)
B1.start()
B2.start()
B1.join()
B2.join()
print(x)
40000000

可重入互斥锁

可重入互斥锁(Reentrant Mutex),相比普通互斥锁,RLock 可以多次 acquire ,常用于递归中

注意:acquir 了多少次,对应也应该 release 多少次

Event

方法 说明
set() 将 Event 对象内部的信号设置为 True
clear() 将 Event 对象内部的信号设置为 False
wait() 如果标志为 True 将立即返回,否则阻塞直至表示变为 True
isSet() 获取 Event 对象内部的信号

event=Event()
event.clear()

def activity():
    print(f'{currentThread().getName()} Prepared')
    event.wait()
    print(f'{currentThread().getName()} Start')

t1=Thread(target=activity)
t2=Thread(target=activity)
t1.start()
t2.start()
event.set()
Thread-1 Prepared
Thread-2 Prepared
Thread-2 Start
Thread-1 Start

Condition

可以认为 Condition 维护了一个锁(Lock or RLock)和一个 waiting 池(存储所有挂起的线程)

方法 说明
acquire() 获得内部锁
release() 释放内部锁
wait() 释放内部锁,并挂起
notify() 任选一个挂起的线程,将其唤醒,并通知其尝试获取锁
notify_all() 对所有挂起的线程调用 notify() 方法

注意:

  1. notify() 前必须先 acquire(),最后必须 release() 才能让所有处于 waiting 状态的线程释放

下面给出基于 Condition 实现的 “生产者-消费者” 模型:

from threading import Thread,Condition,currentThread
from time import sleep
from random import randint

products=[]
con=Condition()

class Producer(Thread):
    def run(self):
        while True:
            con.acquire()
            if len(products)>0:
                con.wait()
            else:
                print('Start Produce...')
                sleep(randint(0,4))
                print('Produce Finished')
                products.append(randint(20,30))
                con.notify()
                con.wait()
            con.release()

class Consumer(Thread):
    def run(self):
        while True:
            con.acquire()
            if len(products)>0:
                print('Start Consume')
                sleep(randint(0,4))
                print('Consume Finished')
                products.pop()
            else:
                print('Oops, no products')
                con.notify()
                con.wait()
            con.release()

_producer=Producer()
_consumer=Consumer()
_consumer.start()
_producer.start()
_producer.join()
_consumer.join()

Queue

方法 说明
get() 删除并返回队头元素
put() 从队尾插入元素,join 计数器加一
task_done() join 计数器减一(与 get() 配对使用)
join() 计数器,不为 0 则阻塞

下面给出基于 Queue 实现的另一种 “生产者-消费者” 模型:

from threading import Thread
import Queue
from time import sleep
from random import randint

q=Queue.Queue(5)

class Producer(Thread):
    def run(self):
        while True:
            q.put('products')
            print('make a product')
            sleep(3)

class Consumer(Thread):
    def run(self):
        while True:
            q.get()
            print('consume a product')
            sleep(5)

p=Producer()
c=Consumer()
c.start()
p.start()
c.join()
p.join()

线程池

方法 说明
submit() 尝试将任务放入线程池,若已满则排队
done() 判断任务是否完成
cancel() 若任务还在排队,则取消该任务
result() 获得任务的返回值,如果还在排队或正在执行则阻塞

from concurrent.futures import ThreadPoolExecutor
from time import sleep

def download_simulate(sleep_time):
    sleep(sleep_time)
    return sleep_time

pool=ThreadPoolExecutor(max_workers=2) # 同时最多开 2 个线程用于下载
tasks=[]
for _ in range(1,11):
    tasks.append(pool.submit(download_simulate,(1)))
sleep(1)
print(f'Task 1 is finished? {tasks[0].done()}')
print(f'Can task 1 be cancelled? {tasks[0].cancel()}')
print(f'Task 3 is finished? {tasks[2].done()}')
print(f'Can task 3 be cancelled? {tasks[2].cancel()}')
print(f'Task 5 is finished? {tasks[4].done()}')
print(f'Can task 5 be cancelled? {tasks[4].cancel()}')
for sq in range(0,10):
    if sq==4:
        continue # task 5 被取消了,不能调用 result() 方法
    print(f'Task {sq+1} slept {tasks[sq].result()}s')
Task 1 is finished? True
Can task 1 be cancelled? False
Task 3 is finished? False
Can task 3 be cancelled? False
Task 5 is finished? False
Can task 5 be cancelled? True
Task 1 slept 1s
Task 2 slept 1s
Task 3 slept 1s
Task 4 slept 1s
Task 6 slept 1s
Task 7 slept 1s
Task 8 slept 1s
Task 9 slept 1s
Task 10 slept 1s

评论