一、进程相关知识
补充语法规范:
将存储布尔值的变量名或者返回结果是布尔值的方法名,都起成以is_开头,牢记一点:见名知义!
1.1、进程号
计算机是如何识别与管理众多进程?答案是:进程ID号,即PID!每一个进程及其子进程都有其唯一的标识号,这是操作系统识别众多进程的方式!
1.1.1、Windows
在Windows上,命令行窗口键入tasklist | findstr可以查看指定的进程号!
C:\Users\Lex>tasklist | findstr "sub"
sublime_text.exe 9600 Console 2 48,308 K
1.1.2、Unix/Linux
可以用ps aux查看当前运行的所有进程及其信息,而加上grep则是筛选
sanxi@sanxi-PC:~$ ps aux | grep "lightdm"
root 3574 0.0 0.0 231416 7144 ? SLsl 09:11 0:00 /usr/sbin/lightdm
root 3587 4.7 1.3 780928 449772 tty1 Ssl+ 09:11 10:20 /usr/lib/xorg/Xorg -background none :0 -seat seat0 -auth /var/run/lightdm/root/:0 -nolisten tcp vt1 -novtswitch
root 3844 0.0 0.0 173916 8668 ? Sl 09:11 0:00 lightdm --session-child 12 23 24
sanxi 9542 0.0 0.0 9292 828 pts/0 S+ 12:48 0:00 grep lightdm
1.1.3、Python
current_process().pid
查看主进程ID号
>>> from multiprocessing import current_process
>>> pid = current_process().pid
>>> pid
10604
os.getpid/getppid
getpid为获取当前进程ID号,getppid为获取父进程ID号
>>> import os
>>> def print_str():
... print(os.getpid())
... print(os.getppid())
>>> print_str()
10604
9356
1.2、僵尸进程
参考博客:Rabbit_Dale:http://www.cnblogs.com/Anker/p/3271773.html
1.2.1、什么是僵尸进程?
在Unix、Linux上,子进程一般通过父进程创建,而后再由该子进程创建新的子子进程或者线程。进程的结束是一个异步过程,意味着如果结束时立即回收其全部资源,父进程将无法获得子进程的状态信息!当一个进程停止后,它的父进程应该调用wait()或者waitpid()这两个系统调用以获得该进程的状态信息才能回收其剩下的遗产!
为保证父进程能够随时获得子进程的状态信息,Unix、Linux提供了一种机制:当进程终止时,内核先释放该进程的内存空间;但仍为其保留状态信息(运行时间、PID、退出状态),直到父进程调用wait()或者waitpid()时才释放剩下的状态信息!
1.2.2、它有害吗?
那么问题就来了,如果父进程因为意外而无法及时调用wait()或者waitpid()回收子进程的状态信息,那么进程号会一直被这个没死透的僵尸所占据!我们知道操作系统的端口号有限,当产生大量僵尸进程时,会影响新进程的创建!
就好像某地经常发生械斗,死伤众多,亡者又无人埋葬,久而久之尸体越堆越多,附近都没人敢靠近这个地方了!此地也逐渐萧条、没落,直至无人居住!
1.2.3、如何处理僵尸进程?
通知家属认领尸体!系统层面上则是执行命令清除指定僵尸!
sanxi@sanxi-PC:~$ ps aux | grep Z # 查询僵尸进程有哪些
sanxi@sanxi-PC:~$ sudo kill -s SIGCHLD 17482 # 精准清除僵尸进程
其实还有更简单暴力的方法,即杀死僵尸进程们的父进程,这样系统会直接回收它们的父进程的所有资源,僵尸进程的状态信息自然也在其中!
其实僵尸状态是每个进程都会经历的过程,只是平时它的父进程来得及回收,所以我们看不见他们!
1.2.4、模拟僵尸进程
sanxi@sanxi-PC:~$ touch zombie.py
sanxi@sanxi-PC:~$ cat >> zombie.py << EOF # shell脚本
> from multiprocessing import Process
> import time
> import os
>
>
> def print_str():
> print('我是子进程', os.getpid())
>
>
> if __name__ == '__main__':
> zombie = Process(target=print_str)
> zombie.start()
> print('我是主进程', os.getpid())
> time.sleep(10)
> EOF
sanxi@sanxi-PC:~$ python3 zombie.py
我是主进程 21286
我是子进程 21287
# 趁着这10秒钟,我们去查询进程信息,这个21388就是僵尸进程了。只是等主进程10秒钟后反应过来就会收拾掉它!
sanxi@sanxi-PC:~$ ps aux | grep Z
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
sanxi 21388 0.0 0.0 0 0 pts/0 Z+ 14:08 0:00 [python3] <defunct>
sanxi 21396 0.0 0.0 9160 896 pts/1 S+ 14:08 0:00 grep Z
# 看,10秒后再来搜就搜不到了!
sanxi@sanxi-PC:~$ ps aux | grep Z
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
sanxi 21356 0.0 0.0 9160 892 pts/0 S+ 14:08 0:00 grep Z
1.3、孤儿进程
孤儿进程是很好理解的,顾名思义主进程意死亡,子进程却还半死不活,没了爹就变成孤儿了!这时候init(内核代言人,操作系统的上帝之子)就会开设孤儿院,专门用于人道毁灭这些孤魂野鬼!
1.4、守护进程
守护进程即后台运行的特殊进程,有些在系统引导的时候启动;有些是有需要的时候才启动,这种跟随主进程终止而终止,我们演示的就是这种!就好似封建社会皇帝驾崩后殉葬一个道理!
daemon = True 将进程设置成守护进程
from multiprocessing import Process
import time
def print_str():
print('我是子进程')
if __name__ == '__main__':
zombie = Process(target=print_str)
zombie.daemon = True
zombie.start()
print('我是主进程')
# 执行得到结果,子进程压根没机会运行,因为守护进城跟随主进程终止而终止程序!
我是主进程
二、进程同步
2.1、互斥锁(lock)
此前学习了程序的并发任务,即通过Process模块实现;但它们之间运行没有顺序(我的演示有顺序是因为Linux运行太快),启动后也不受我们控制,这给我们带来了新问题:进程间通信时数据不共享,但是因为共享同一套文件系统,那么多个进程使用同一份文件资源时,容易引发数据安全问题!
未加锁
from multiprocessing import Process
import ujson
import time
def buy_phone():
with open('phone.json', 'rt', encoding='utf-8') as file:
ticket = ujson.load(file)
time.sleep(0.1)
print('剩余票数:', ticket.get('phone'))
if ticket.get('phone') == 0:
print('没票了')
elif ticket.get('phone') > 0:
ticket['phone'] -= 1
with open('phone.json', 'wt', encoding='utf-8') as file:
ujson.dump(ticket, file)
time.sleep(0.1)
print('抢购小米11成功')
if __name__ == '__main__':
for i in range(5):
human = Process(target=buy_phone)
human.start()
# 执行得到结果
剩余票数: 1
剩余票数: 1
剩余票数: 1
剩余票数: 1
剩余票数: 1
抢购小米11成功
抢购小米11成功
抢购小米11成功
抢购小米11成功
抢购小米11成功
加锁
针对这种情况,解决方式就是给文件加锁处理,先到的人获得锁则后面的人都无法再对其进行修改:
from multiprocessing import Process, Lock
import ujson
import time
def buy_phone():
lock.acquire()
ticket = ujson.load(open('phone.json'))
time.sleep(0.1) # 模拟网络延迟
if ticket.get('phone') > 0:
ticket['phone'] -= 1
ujson.dump(ticket, open('phone.json', 'wt'))
time.sleep(0.1) # 模拟网络延迟
print('抢购小米11成功')
else:
print('没票了')
lock.release()
if __name__ == '__main__':
lock = Lock() # 锁应该在主进程中生成,让所有子进程来抢,先到先得!
for i in range(5):
human = Process(target=buy_phone())
human.start()
# 执行得到结果
抢购小米11成功
没票了
没票了
没票了
没票了
这保证了同一时刻只有一个人能对这个文件进行修改,但是因为基于文件操作,文件又是放在磁盘上面,因此速度较慢!我们需要找到一种新的方法让鱼与熊掌都可得兼!这就是IPC机制的队列啦!
三、进程间通信
3.1、队列(queue)
参考博客1:Python队列:https://geek-docs.com/python/python-examples/python-queue-first-in-first-out.html
参考博客2:https://www.cnblogs.com/Dominic-Ji/articles/10929384.html
队列是含有一组对象的容器,支持快速插入和删除的先进先出语义,它类似于管道+锁的集合体!
此前说过,进程间是相互隔离的,若想实现进程间通信(Interprocess process communication,简称IPC;那么multiprocessing为我们提供两种方式:队列、管道!
3.1.1、语法
Queue([maxsize]),创建共享的进程队列,Queue是安全(因为有锁)的队列,它可以实现进程间的数据交换。
maxsize
括号内可以直接传数字,表示生成的队列最大可同时存放数量,默认为0即无限制!
get([block[,timeout])
可直接取值,为空则阻塞直至有数据可用;block用于控制阻塞行为,默认为True,如设置为Flase,无值则报错;timeout是控制阻塞时间(秒),指定时间内无数据可用则报错!
get_nowait()
等同于get(block=Flase),即取值,为空也不阻塞,但会报错!
put(item[,block[,timeout]])
将直接item放入队列,队列满则阻塞至有空位;Flase和timeout同get一样!
put_nowait()
等同于put(block=Flase),即传值时队列无空位也不阻塞,但报错!
q.empty()
判断队列是否为空,是则返回True!但该结果不可靠!比如在返回True的过程中,队列中又加入了项目,它是无法感知的!
q.full()
判断队列是否已满,是则返回True!同样不可靠,理由同上!
q.qsize()
返回队列中item的数量,结果也不可靠,理由同q.empty()和q.full()一样!
3.2、简单演示
>>> from multiprocessing import Queue
>>> queue = Queue(3)
>>> queue.put(666)
>>> queue.put(777)
>>> queue.put(888)
>>> queue.full()
True
>>> queue.get()
666
>>> queue.get()
777
>>> queue.get()
888
>>> queue.empty()
True
>>>
四、生产者消费者模型
4.1、为什么要有生产者消费者模型
由上述演示可得知,当使用put()遇到队列满时和使用get()遇到无值可取时,程序就堵塞在那里了,看起来就有点像取值太快或者生成太快而导致!我们需要有一种手段来解决这种滞销或供不应求的问题!
4.2、什么是生产者消费者模型
生产者生产产品,消费者消费产品;现实生活中工厂生产好的产品是直接卖给消费者吗?不是的,都是有中间商赚差价,要是个个消费者都守着工厂门口一有货就抢那还得了!基本都是通过代理商、商城等等方式转售到消费者手中!
同理生产者消费者模型就是彼此间不直接交流,而是通过代理商!即生产者按照计划不断地生产产品然后交给代理商(阻塞队列),再也不用理会消费者是否能买完(put);消费者也不用天天眼巴巴地等着产品,直接去找代理商买就行(get)!代理商(阻塞队列)相当于一个缓冲区,专注于平衡二者之间的关系!
4.3、如何构建此模型?
多说无益!手底下见真章:
import os
import random
import time
from multiprocessing import Process, Queue
def buy_phone(produce):
while True:
produce = queue.get()
if not produce: # 为空则终止循环
break
time.sleep(random.random()) # 模拟购物延迟
print('用户:', os.getpid(), '购买了', produce)
def factory(pipline):
for i in range(5): # 阻塞队列(代理商)
time.sleep(random.random()) # 模拟生产周期
produce = f'XiaoMi11 {i}'
pipline.put(produce)
print('工号:', os.getpid(), '生产了', produce)
if __name__ == '__main__':
queue = Queue()
worker = Process(target=factory, args=(queue,)) # 作业工人
worker2 = Process(target=factory, args=(queue,)) # 作业工人
customer = Process(target=buy_phone, args=(queue,))
worker.start()
worker2.start()
customer.start()
worker.join() # 必须等生产完了才能发送结束信号
worker2.join()
queue.put(None) # 这步必须写,告诉代理商别等了,生产完了!而且有几个消费者必须put几个None作为结束信号
print('主进程:双十一大促销')
# 执行得到结果
工号: 10418 生产了 XiaoMi11 0
工号: 10418 生产了 XiaoMi11 1
用户: 10420 购买了 XiaoMi11 0
工号: 10419 生产了 XiaoMi11 0
工号: 10418 生产了 XiaoMi11 2
工号: 10418 生产了 XiaoMi11 3
用户: 10420 购买了 XiaoMi11 1
用户: 10420 购买了 XiaoMi11 0
用户: 10420 购买了 XiaoMi11 2
工号: 10419 生产了 XiaoMi11 1
工号: 10418 生产了 XiaoMi11 4
工号: 10419 生产了 XiaoMi11 2
用户: 10420 购买了 XiaoMi11 3
工号: 10419 生产了 XiaoMi11 3
工号: 10419 生产了 XiaoMi11 4
主进程:双十一大促销
用户: 10420 购买了 XiaoMi11 1
用户: 10420 购买了 XiaoMi11 4
用户: 10420 购买了 XiaoMi11 2
用户: 10420 购买了 XiaoMi11 3
用户: 10420 购买了 XiaoMi11 4
Process finished with exit code 0
上述方法解决了此前说的生产者、消费者之间的紧张关系,但是方法也不甚高明,只因纯手工put结束信号!
4.4、JoinableQueue
此方法允许队列中消费者通知生产者项目已被处理,这个通知用的是生产者和消费者之间共享的信号和条件实现!每当生产者往该队列中存入数据的时候,内部计数器+1,每当消费者处理完数据后调用task_done的时候,计算器-1!
4.4.1、语法
JoinableQueue([maxsize])
maxsize
是队列中允许最大item数,不填则默认不限制!
task_done()
消费者调用此方法通知中间商item已被处理,调用次数超过生产速度,会报错!
join()
生产者调用此方法阻塞队列,直到计算器为0的时候,才往后运行
import os
import random
import time
from multiprocessing import Process, Queue, JoinableQueue
def buy_phone(produce):
while True:
produce = queue.get()
if not produce: # 为空则终止循环
break
time.sleep(random.random()) # 模拟购物延迟
print('用户:', os.getpid(), '购买了', produce)
queue.task_done() # 每处理一个数据就发送一次信号告知数据已被处理
def factory(pipline):
for i in range(5): # 阻塞队列(代理商)
time.sleep(random.random()) # 模拟生产周期
produce = f'XiaoMi11 {i}'
pipline.put(produce)
print('工号:', os.getpid(), '生产了', produce)
pipline.join() # 阻塞队列,直到task_done完毕
if __name__ == '__main__':
queue = JoinableQueue()
worker = Process(target=factory, args=(queue,)) # 作业工人
worker2 = Process(target=factory, args=(queue,)) # 作业工人
customer = Process(target=buy_phone, args=(queue,))
customer2 = Process(target=buy_phone, args=(queue,))
customer.daemon = True
customer2.daemon = True
start_list = [worker, worker2, customer, customer2]
for i in start_list:
i.start()
worker.join() # 生产完了肯定接收完了,所以这里不用写customer了
worker2.join()
print('主进程:双十一大促销')
# 执行得到结果
工号: 13544 生产了 XiaoMi11 0
工号: 13543 生产了 XiaoMi11 0
工号: 13543 生产了 XiaoMi11 1
工号: 13543 生产了 XiaoMi11 2
工号: 13544 生产了 XiaoMi11 1
用户: 13545 购买了 XiaoMi11 0
用户: 13546 购买了 XiaoMi11 0
用户: 13545 购买了 XiaoMi11 1
工号: 13544 生产了 XiaoMi11 2
用户: 13545 购买了 XiaoMi11 1
工号: 13543 生产了 XiaoMi11 3
工号: 13544 生产了 XiaoMi11 3
用户: 13546 购买了 XiaoMi11 2
用户: 13545 购买了 XiaoMi11 2
工号: 13543 生产了 XiaoMi11 4
工号: 13544 生产了 XiaoMi11 4
用户: 13545 购买了 XiaoMi11 3
用户: 13546 购买了 XiaoMi11 3
用户: 13545 购买了 XiaoMi11 4
用户: 13546 购买了 XiaoMi11 4
主进程:双十一大促销