1、死锁与递归锁(了解)
1.1、死锁
死锁,指的是两个或以上进程或线程在执行过程汇总,因抢锁而导致互相僵持的现象,这种现象我们称之为死锁,造成死锁现象的进程称之为死锁进程!下面简单演示一下:
from threading import Thread, Lock, current_thread
import time
t1 = Lock() # 锁1
t2 = Lock() # 锁2
def fight():
t1.acquire() # 1、姑且称第一次拿到锁1的叫线程1,其余4个线程只能干等
print('拿到锁T1', thread.getName())
t2.acquire() # 2、因为另外4个线程还在等锁1,所以没人抢锁2,线程1毫不费力地获得锁2
print('拿到锁T2', thread.getName())
t2.release() # 3、线程1释放锁2,因为其它线程还卡在等锁1,所以释放后也没人来抢
t1.release() # 4、线程1释放锁1,干等的4个线程其中一个抢到了锁1,我们叫它线程2吧,然后重复上面的步骤
def fight2():
t2.acquire() # 因为上面说了没人抢,所以第一次抢到锁的线程再次无压力获得此锁
print('拿到锁T2', thread.getName())
time.sleep(1) # 睡1秒
t1.acquire() # 锁1在之前释放后被其它线程占据,所以线程1卡在这等线程2释放;而上述拿到锁1的线程2却卡在等待线程1释放锁2
print('拿到锁T1', thread.getName()) # 两个线程相当于拿着对方家的门的钥匙被锁在家里,永远也出不去
t1.release() # 当然了,因为线程速度太快,执行结果一定是一样的,可能线程抢到的锁却不一样
t2.release()
def run():
fight()
fight2()
if __name__ == '__main__':
for i in range(10):
thread = Thread(target=run)
thread.start()
# 执行结果,卡死在这了
拿到锁T1 Thread-1
拿到锁T2 Thread-2
拿到锁T2 Thread-2
拿到锁T1 Thread-2
补充单例知识
1.2、递归锁
为了避免上述死锁现象,multipocessing模块提供了可以被连续的抢锁和解锁的机制,RLock!它内部有一个lock和计数器,每抢锁一次计数+1,每释放一个计数-1,只要计数不为0,那么其它人都无法抢到该锁!
t1=t2=RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
2、信号量(了解)
信号量,即Semaphore!信号量在不同的地方可能对应不同的技术点,在并发编程中信号量指的是锁!之前说的锁一直以来都是一个锁,让很多人来抢这一个锁;而这信号量,则是多个锁摆出来!如果我们将互斥锁比喻成一个厕所的话,信号量就是公共厕所,公共厕所不是有好几个坑位!
from threading import Thread, Semaphore
import time
def func():
t1.acquire()
print(thread.getName(), '抢到了锁')
time.sleep(2)
t1.release()
if __name__ == '__main__':
t1 = Semaphore(5) # 最大锁数,每次只有五个线程能够抢到锁
for i in range(10):
thread = Thread(target=func)
thread.start()
3、Event事件(了解)
线程是不可预测且独立运行的,当线程需要依据另外一个线程的状态来执行时,事情就变得麻烦了!为此,threading模块提供了一个Event类,它可以让一些进程/线程需要等待另外一些进程或线程执行完毕后才能运行,类似于发射信号告诉它们我执行完了你们接下去!
至于代码演示不想写了,烦。
4、线程队列(了解)
队列是管道+锁,所以线程用队列还是为了保证数据的安全,为了兼顾安全速度肯定会有所下降的。
4.1、queue
普通队列,queue跟进程的queue一样,奉行先进先出
import queue # 底层调用的是threading模块
thread = queue.Queue()
thread.put('我第一')
thread.put('我是老二')
thread.put('我是老三')
print(thread.get())
print(thread.get())
print(thread.get())
# 执行结果
我第一
我是老二
我是老三
4.2、LifoQueue
即last in frist out,奉行后进先出!它属于queue模块!因此上面代码只需要改一行:
thread = queue.LifoQueue() # queue改成LifoQueue
# 执行结果
我是老三
我是老二
我第一
4.3、PriorityQueue
即优先级队列,你可以给放入队列中的数据设置进出的优先级,在put括号内放一个元祖,第一个放优先级数字,越小优先级越高,支持负数!
import queue
thread = queue.PriorityQueue()
thread.put((100, '我是大佬'))
thread.put((-20, '我是老二'))
thread.put((30, '我是老三'))
print(thread.get())
print(thread.get())
print(thread.get())
# 执行结果
(-20, '我是老二')
(30, '我是老三')
(100, '我是大佬')
5、进程池与线程池(掌握)
5.1、什么是池?
顾名思义,就是一个池子里最多能同时存在多少进程或线程,也就是最大进程/线程数啦!
5.2、为什么要有池?
众所周知,硬件发展速度远远跟不上软件发展速度,软件的计算能力等总是远远超过硬件的性能上限,如果不加以限制,指不定哪天你的机器就炸了!
所以进程池/线程池是用来保证计算机安全的情况下最大限度地利用计算机,它降低了程序的运行效率但保证了计算机硬件安全,让程序能够正常运行,不然机器都挂了你还运行个鬼!
5.3、如何使用池子?
- ThreadPoolExecutor(),创建线程池
- pool.submit(),异步提交任务
- pool.shutdown(),关闭线程池,等待池子中所有任务执行完毕再执行下一步
- result(),获取任务返回值
线程池
from concurrent.futures import ThreadPoolExecutor
import os
pool = ThreadPoolExecutor(3) # 最大线程数,池子内线程是固定的不会重复创建或销毁,减少了开销!
def print_str(number):
return number**number, os.getpid()
task_list = []
for i in range(6):
task = pool.submit(print_str, i) # 异步提交,返回一个对象(其实是括号内任务的返回值)
task_list.append(task)
pool.shutdown() # 关闭线程池,等待线程池中所有任务执行完毕
for result in task_list:
print(result.result()) # 注意,result是同步的,要用这种方式才能变成并发
print('执行完毕')
# 执行结果
(1, 7384)
(1, 7384)
(4, 7384)
(27, 7384)
(256, 7384)
(3125, 7384)
执行完毕
进程池
使用方法与上面基本一致,只需要把ThreadPoolExecutor改成ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import os
import time
pool = ProcessPoolExecutor(3) # 最大进程数,默认数量为os.cpu_count(),其它同线程!
def print_str(number):
print(number, os.getpid())
time.sleep(1)
return number ** number
def call_back(obj):
print('执行结果:', obj.result()) # 执行完毕触发自动回调机制后,调用result查看结果
if __name__ == '__main__': # 之前获取回调结果方式是手动写的,非常麻烦。
for i in range(6):
pool.submit(print_str, i).add_done_callback(call_back) # add_done_callback自动回调机制
print('执行完毕')
# 执行结果
执行完毕
0 10832
1 16000
2 11792
3 10832
执行结果: 1
4 11792
5 16000
执行结果: 4
执行结果: 1
执行结果: 27
执行结果: 256
执行结果: 3125
6、协程(了解)
协程,单线程下实现并发效果,又称微线程!协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的,它并不是Python中的知识点,而是由Python社区有人提出来的一种概念!
,由程序猿自己在代码层面上检测所有的IO操作,一旦遇到IO,我们在代码级别完成切换;欺骗CPU说程序一直在运行没有IO,从而提高占用CPU的时间,提升程序的运行效率。
6.1、协程特点
- 协程指的是单个线程,当其发生阻塞时,会阻塞这个线程,因此只能是一个程序开启多个进程,进程下开启多线程,线程开启协程,极限压榨CPU!
- 程序员自己保存多个线程切换的上下文
6.2、gevent模块(了解)
以下内容来自知乎:egon老湿,https://zhuanlan.zhihu.com/p/112183527
通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)
协程实现TCP服务端的并发效果(了解)
服务端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
客户端
多线程并发多个客户端
from threading import Thread
from socket import *
import threading
def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
c.connect((server_ip,port))
count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()