43、线程补充

并发编程 / 2021-01-16

1、死锁与递归锁(了解)

1.1、死锁

死锁,指的是两个或以上进程或线程在执行过程汇总,因抢锁而导致互相僵持的现象,这种现象我们称之为死锁,造成死锁现象的进程称之为死锁进程!下面简单演示一下:

from threading import Thread, Lock, current_thread
import time

t1 = Lock()  # 锁1
t2 = Lock()  # 锁2


def fight():
    t1.acquire()  # 1、姑且称第一次拿到锁1的叫线程1,其余4个线程只能干等
    print('拿到锁T1', thread.getName())
    t2.acquire()  # 2、因为另外4个线程还在等锁1,所以没人抢锁2,线程1毫不费力地获得锁2
    print('拿到锁T2', thread.getName())
    t2.release()  # 3、线程1释放锁2,因为其它线程还卡在等锁1,所以释放后也没人来抢
    t1.release()  # 4、线程1释放锁1,干等的4个线程其中一个抢到了锁1,我们叫它线程2吧,然后重复上面的步骤

def fight2():
    t2.acquire()  # 因为上面说了没人抢,所以第一次抢到锁的线程再次无压力获得此锁
    print('拿到锁T2', thread.getName())
    time.sleep(1)  # 睡1秒
    t1.acquire()  # 锁1在之前释放后被其它线程占据,所以线程1卡在这等线程2释放;而上述拿到锁1的线程2却卡在等待线程1释放锁2
    print('拿到锁T1', thread.getName())  # 两个线程相当于拿着对方家的门的钥匙被锁在家里,永远也出不去
    t1.release()  # 当然了,因为线程速度太快,执行结果一定是一样的,可能线程抢到的锁却不一样
    t2.release()

def run():
    fight()
    fight2()


if __name__ == '__main__':
    for i in range(10):
        thread = Thread(target=run)
        thread.start()
        
# 执行结果,卡死在这了
拿到锁T1 Thread-1
拿到锁T2 Thread-2
拿到锁T2 Thread-2
拿到锁T1 Thread-2

补充单例知识

1.2、递归锁

为了避免上述死锁现象,multipocessing模块提供了可以被连续的抢锁和解锁的机制,RLock!它内部有一个lock和计数器,每抢锁一次计数+1,每释放一个计数-1,只要计数不为0,那么其它人都无法抢到该锁!

t1=t2=RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

2、信号量(了解)

信号量,即Semaphore!信号量在不同的地方可能对应不同的技术点,在并发编程中信号量指的是锁!之前说的锁一直以来都是一个锁,让很多人来抢这一个锁;而这信号量,则是多个锁摆出来!如果我们将互斥锁比喻成一个厕所的话,信号量就是公共厕所,公共厕所不是有好几个坑位!

from threading import Thread, Semaphore
import time


def func():
    t1.acquire()
    print(thread.getName(), '抢到了锁')
    time.sleep(2)
    t1.release()


if __name__ == '__main__':
    t1 = Semaphore(5)  # 最大锁数,每次只有五个线程能够抢到锁
    for i in range(10):
        thread = Thread(target=func)
        thread.start()

3、Event事件(了解)

线程是不可预测且独立运行的,当线程需要依据另外一个线程的状态来执行时,事情就变得麻烦了!为此,threading模块提供了一个Event类,它可以让一些进程/线程需要等待另外一些进程或线程执行完毕后才能运行,类似于发射信号告诉它们我执行完了你们接下去!

至于代码演示不想写了,烦。

4、线程队列(了解)

队列是管道+锁,所以线程用队列还是为了保证数据的安全,为了兼顾安全速度肯定会有所下降的。

4.1、queue

普通队列,queue跟进程的queue一样,奉行先进先出

import queue  # 底层调用的是threading模块

thread = queue.Queue()
thread.put('我第一')
thread.put('我是老二')
thread.put('我是老三')

print(thread.get())
print(thread.get())
print(thread.get())

# 执行结果
我第一
我是老二
我是老三
4.2、LifoQueue

即last in frist out,奉行后进先出!它属于queue模块!因此上面代码只需要改一行:

thread = queue.LifoQueue()  # queue改成LifoQueue

# 执行结果
我是老三
我是老二
我第一
4.3、PriorityQueue

即优先级队列,你可以给放入队列中的数据设置进出的优先级,在put括号内放一个元祖,第一个放优先级数字,越小优先级越高,支持负数!

import queue

thread = queue.PriorityQueue()
thread.put((100, '我是大佬'))
thread.put((-20, '我是老二'))
thread.put((30, '我是老三'))

print(thread.get())
print(thread.get())
print(thread.get())

# 执行结果
(-20, '我是老二')
(30, '我是老三')
(100, '我是大佬')

5、进程池与线程池(掌握)

5.1、什么是池?

顾名思义,就是一个池子里最多能同时存在多少进程或线程,也就是最大进程/线程数啦!

5.2、为什么要有池?

众所周知,硬件发展速度远远跟不上软件发展速度,软件的计算能力等总是远远超过硬件的性能上限,如果不加以限制,指不定哪天你的机器就炸了!

所以进程池/线程池是用来保证计算机安全的情况下最大限度地利用计算机,它降低了程序的运行效率但保证了计算机硬件安全,让程序能够正常运行,不然机器都挂了你还运行个鬼!

5.3、如何使用池子?
  • ThreadPoolExecutor(),创建线程池
  • pool.submit(),异步提交任务
  • pool.shutdown(),关闭线程池,等待池子中所有任务执行完毕再执行下一步
  • result(),获取任务返回值
线程池
from concurrent.futures import ThreadPoolExecutor
import os

pool = ThreadPoolExecutor(3)  # 最大线程数,池子内线程是固定的不会重复创建或销毁,减少了开销!


def print_str(number):
    return number**number, os.getpid()


task_list = []
for i in range(6):
    task = pool.submit(print_str, i)  # 异步提交,返回一个对象(其实是括号内任务的返回值)
    task_list.append(task)
pool.shutdown()  # 关闭线程池,等待线程池中所有任务执行完毕
for result in task_list:
    print(result.result())  # 注意,result是同步的,要用这种方式才能变成并发
print('执行完毕')


# 执行结果
(1, 7384)
(1, 7384)
(4, 7384)
(27, 7384)
(256, 7384)
(3125, 7384)
执行完毕
进程池

使用方法与上面基本一致,只需要把ThreadPoolExecutor改成ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import os
import time

pool = ProcessPoolExecutor(3)  # 最大进程数,默认数量为os.cpu_count(),其它同线程!


def print_str(number):
    print(number, os.getpid())
    time.sleep(1)
    return number ** number


def call_back(obj):
    print('执行结果:', obj.result())  # 执行完毕触发自动回调机制后,调用result查看结果


if __name__ == '__main__':  # 之前获取回调结果方式是手动写的,非常麻烦。
    for i in range(6):
        pool.submit(print_str, i).add_done_callback(call_back)  # add_done_callback自动回调机制

    print('执行完毕')
# 执行结果
执行完毕
0 10832
1 16000
2 11792
3 10832
执行结果: 1
4 11792
5 16000
执行结果: 4
执行结果: 1
执行结果: 27
执行结果: 256
执行结果: 3125

6、协程(了解)

协程,单线程下实现并发效果,又称微线程!协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的,它并不是Python中的知识点,而是由Python社区有人提出来的一种概念!

,由程序猿自己在代码层面上检测所有的IO操作,一旦遇到IO,我们在代码级别完成切换;欺骗CPU说程序一直在运行没有IO,从而提高占用CPU的时间,提升程序的运行效率。

6.1、协程特点
  • 协程指的是单个线程,当其发生阻塞时,会阻塞这个线程,因此只能是一个程序开启多个进程,进程下开启多线程,线程开启协程,极限压榨CPU!
  • 程序员自己保存多个线程切换的上下文
6.2、gevent模块(了解)

以下内容来自知乎:egon老湿,https://zhuanlan.zhihu.com/p/112183527

通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)

协程实现TCP服务端的并发效果(了解)

服务端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)
客户端

多线程并发多个客户端

from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()
世间微尘里 独爱茶酒中