41、进程

并发编程 / 2021-01-12

一、进程相关知识

补充语法规范:

将存储布尔值的变量名或者返回结果是布尔值的方法名,都起成以is_开头,牢记一点:见名知义!

1.1、进程号

计算机是如何识别与管理众多进程?答案是:进程ID号,即PID!每一个进程及其子进程都有其唯一的标识号,这是操作系统识别众多进程的方式!

1.1.1、Windows

在Windows上,命令行窗口键入tasklist | findstr可以查看指定的进程号!

C:\Users\Lex>tasklist | findstr "sub"
sublime_text.exe              9600 Console                    2     48,308 K
1.1.2、Unix/Linux

可以用ps aux查看当前运行的所有进程及其信息,而加上grep则是筛选

sanxi@sanxi-PC:~$ ps aux | grep "lightdm"
root      3574  0.0  0.0 231416  7144 ?        SLsl 09:11   0:00 /usr/sbin/lightdm
root      3587  4.7  1.3 780928 449772 tty1    Ssl+ 09:11  10:20 /usr/lib/xorg/Xorg -background none :0 -seat seat0 -auth /var/run/lightdm/root/:0 -nolisten tcp vt1 -novtswitch
root      3844  0.0  0.0 173916  8668 ?        Sl   09:11   0:00 lightdm --session-child 12 23 24
sanxi     9542  0.0  0.0   9292   828 pts/0    S+   12:48   0:00 grep lightdm
1.1.3、Python
current_process().pid

查看主进程ID号

>>> from multiprocessing import current_process
>>> pid = current_process().pid
>>> pid
10604
os.getpid/getppid

getpid为获取当前进程ID号,getppid为获取父进程ID号

>>> import os
>>> def print_str():
...     print(os.getpid())
...     print(os.getppid())
>>> print_str()
10604
9356

1.2、僵尸进程

参考博客:Rabbit_Dale:http://www.cnblogs.com/Anker/p/3271773.html

1.2.1、什么是僵尸进程?

在Unix、Linux上,子进程一般通过父进程创建,而后再由该子进程创建新的子子进程或者线程。进程的结束是一个异步过程,意味着如果结束时立即回收其全部资源,父进程将无法获得子进程的状态信息!当一个进程停止后,它的父进程应该调用wait()或者waitpid()这两个系统调用以获得该进程的状态信息才能回收其剩下的遗产!

为保证父进程能够随时获得子进程的状态信息,Unix、Linux提供了一种机制:当进程终止时,内核先释放该进程的内存空间;但仍为其保留状态信息(运行时间、PID、退出状态),直到父进程调用wait()或者waitpid()时才释放剩下的状态信息!

1.2.2、它有害吗?

那么问题就来了,如果父进程因为意外而无法及时调用wait()或者waitpid()回收子进程的状态信息,那么进程号会一直被这个没死透的僵尸所占据!我们知道操作系统的端口号有限,当产生大量僵尸进程时,会影响新进程的创建!

就好像某地经常发生械斗,死伤众多,亡者又无人埋葬,久而久之尸体越堆越多,附近都没人敢靠近这个地方了!此地也逐渐萧条、没落,直至无人居住!

1.2.3、如何处理僵尸进程?

通知家属认领尸体!系统层面上则是执行命令清除指定僵尸!

sanxi@sanxi-PC:~$ ps aux | grep Z  # 查询僵尸进程有哪些
sanxi@sanxi-PC:~$ sudo kill -s SIGCHLD 17482  # 精准清除僵尸进程

其实还有更简单暴力的方法,即杀死僵尸进程们的父进程,这样系统会直接回收它们的父进程的所有资源,僵尸进程的状态信息自然也在其中!

其实僵尸状态是每个进程都会经历的过程,只是平时它的父进程来得及回收,所以我们看不见他们!

1.2.4、模拟僵尸进程
sanxi@sanxi-PC:~$ touch zombie.py
sanxi@sanxi-PC:~$ cat >> zombie.py << EOF  # shell脚本
> from multiprocessing import Process
> import time
> import os
> 
> 
> def print_str():
>     print('我是子进程', os.getpid())
> 
> 
> if __name__ == '__main__':
>     zombie = Process(target=print_str)
>     zombie.start()
>     print('我是主进程', os.getpid())
>     time.sleep(10)
> EOF
sanxi@sanxi-PC:~$ python3 zombie.py 
我是主进程 21286
我是子进程 21287

# 趁着这10秒钟,我们去查询进程信息,这个21388就是僵尸进程了。只是等主进程10秒钟后反应过来就会收拾掉它!
sanxi@sanxi-PC:~$ ps aux | grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
sanxi    21388  0.0  0.0      0     0 pts/0    Z+   14:08   0:00 [python3] <defunct>
sanxi    21396  0.0  0.0   9160   896 pts/1    S+   14:08   0:00 grep Z
# 看,10秒后再来搜就搜不到了!
sanxi@sanxi-PC:~$ ps aux | grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
sanxi    21356  0.0  0.0   9160   892 pts/0    S+   14:08   0:00 grep Z

1.3、孤儿进程

孤儿进程是很好理解的,顾名思义主进程意死亡,子进程却还半死不活,没了爹就变成孤儿了!这时候init(内核代言人,操作系统的上帝之子)就会开设孤儿院,专门用于人道毁灭这些孤魂野鬼!

1.4、守护进程

守护进程即后台运行的特殊进程,有些在系统引导的时候启动;有些是有需要的时候才启动,这种跟随主进程终止而终止,我们演示的就是这种!就好似封建社会皇帝驾崩后殉葬一个道理!

daemon = True 将进程设置成守护进程

from multiprocessing import Process
import time


def print_str():
    print('我是子进程')


if __name__ == '__main__':
    zombie = Process(target=print_str)
    zombie.daemon = True
    zombie.start()
    print('我是主进程')
    
# 执行得到结果,子进程压根没机会运行,因为守护进城跟随主进程终止而终止程序!
我是主进程

二、进程同步

2.1、互斥锁(lock)

此前学习了程序的并发任务,即通过Process模块实现;但它们之间运行没有顺序(我的演示有顺序是因为Linux运行太快),启动后也不受我们控制,这给我们带来了新问题:进程间通信时数据不共享,但是因为共享同一套文件系统,那么多个进程使用同一份文件资源时,容易引发数据安全问题!

未加锁
from multiprocessing import Process
import ujson
import time


def buy_phone():
    with open('phone.json', 'rt', encoding='utf-8') as file:
        ticket = ujson.load(file)
    time.sleep(0.1)
    print('剩余票数:', ticket.get('phone'))
    if ticket.get('phone') == 0:
        print('没票了')
    elif ticket.get('phone') > 0:
        ticket['phone'] -= 1
        with open('phone.json', 'wt', encoding='utf-8') as file:
            ujson.dump(ticket, file)
        time.sleep(0.1)
        print('抢购小米11成功')


if __name__ == '__main__':
    for i in range(5):
        human = Process(target=buy_phone)
        human.start()
        
# 执行得到结果
剩余票数: 1
剩余票数: 1
剩余票数: 1
剩余票数: 1
剩余票数: 1
抢购小米11成功
抢购小米11成功
抢购小米11成功
抢购小米11成功
抢购小米11成功
加锁

针对这种情况,解决方式就是给文件加锁处理,先到的人获得锁则后面的人都无法再对其进行修改:

from multiprocessing import Process, Lock
import ujson
import time


def buy_phone():
    lock.acquire()
    ticket = ujson.load(open('phone.json'))
    time.sleep(0.1)  # 模拟网络延迟
    if ticket.get('phone') > 0:
        ticket['phone'] -= 1
        ujson.dump(ticket, open('phone.json', 'wt'))
        time.sleep(0.1)  # 模拟网络延迟
        print('抢购小米11成功')
    else:
        print('没票了')
    lock.release()


if __name__ == '__main__':
    lock = Lock()  # 锁应该在主进程中生成,让所有子进程来抢,先到先得!
    for i in range(5):
        human = Process(target=buy_phone())
        human.start()
        
# 执行得到结果
抢购小米11成功
没票了
没票了
没票了
没票了

这保证了同一时刻只有一个人能对这个文件进行修改,但是因为基于文件操作,文件又是放在磁盘上面,因此速度较慢!我们需要找到一种新的方法让鱼与熊掌都可得兼!这就是IPC机制的队列啦!

三、进程间通信

3.1、队列(queue)

参考博客1:Python队列:https://geek-docs.com/python/python-examples/python-queue-first-in-first-out.html

参考博客2:https://www.cnblogs.com/Dominic-Ji/articles/10929384.html

队列是含有一组对象的容器,支持快速插入和删除的先进先出语义,它类似于管道+锁的集合体!

此前说过,进程间是相互隔离的,若想实现进程间通信(Interprocess process communication,简称IPC;那么multiprocessing为我们提供两种方式:队列、管道!

3.1.1、语法

Queue([maxsize]),创建共享的进程队列,Queue是安全(因为有锁)的队列,它可以实现进程间的数据交换。

maxsize

括号内可以直接传数字,表示生成的队列最大可同时存放数量,默认为0即无限制!

get([block[,timeout])

可直接取值,为空则阻塞直至有数据可用;block用于控制阻塞行为,默认为True,如设置为Flase,无值则报错;timeout是控制阻塞时间(秒),指定时间内无数据可用则报错!

get_nowait()

等同于get(block=Flase),即取值,为空也不阻塞,但会报错!

put(item[,block[,timeout]])

将直接item放入队列,队列满则阻塞至有空位;Flase和timeout同get一样!

put_nowait()

等同于put(block=Flase),即传值时队列无空位也不阻塞,但报错!

q.empty()

判断队列是否为空,是则返回True!但该结果不可靠!比如在返回True的过程中,队列中又加入了项目,它是无法感知的!

q.full()

判断队列是否已满,是则返回True!同样不可靠,理由同上!

q.qsize()

返回队列中item的数量,结果也不可靠,理由同q.empty()和q.full()一样!

3.2、简单演示
>>> from multiprocessing import Queue
>>> queue = Queue(3)
>>> queue.put(666)
>>> queue.put(777)
>>> queue.put(888)
>>> queue.full()
True
>>> queue.get()
666
>>> queue.get()
777
>>> queue.get()
888
>>> queue.empty()
True
>>>

四、生产者消费者模型

4.1、为什么要有生产者消费者模型

由上述演示可得知,当使用put()遇到队列满时和使用get()遇到无值可取时,程序就堵塞在那里了,看起来就有点像取值太快或者生成太快而导致!我们需要有一种手段来解决这种滞销或供不应求的问题!

4.2、什么是生产者消费者模型

生产者生产产品,消费者消费产品;现实生活中工厂生产好的产品是直接卖给消费者吗?不是的,都是有中间商赚差价,要是个个消费者都守着工厂门口一有货就抢那还得了!基本都是通过代理商、商城等等方式转售到消费者手中!

同理生产者消费者模型就是彼此间不直接交流,而是通过代理商!即生产者按照计划不断地生产产品然后交给代理商(阻塞队列),再也不用理会消费者是否能买完(put);消费者也不用天天眼巴巴地等着产品,直接去找代理商买就行(get)!代理商(阻塞队列)相当于一个缓冲区,专注于平衡二者之间的关系!

4.3、如何构建此模型?

多说无益!手底下见真章:

import os
import random
import time
from multiprocessing import Process, Queue


def buy_phone(produce):
    while True:
        produce = queue.get()
        if not produce:  # 为空则终止循环
            break
        time.sleep(random.random())  # 模拟购物延迟
        print('用户:', os.getpid(), '购买了', produce)


def factory(pipline):
    for i in range(5):  # 阻塞队列(代理商)
        time.sleep(random.random())  # 模拟生产周期
        produce = f'XiaoMi11 {i}'
        pipline.put(produce)
        print('工号:', os.getpid(), '生产了', produce)


if __name__ == '__main__':
    queue = Queue()
    worker = Process(target=factory, args=(queue,))  # 作业工人
    worker2 = Process(target=factory, args=(queue,))  # 作业工人
    customer = Process(target=buy_phone, args=(queue,))
    worker.start()
    worker2.start()
    customer.start()
    worker.join()  # 必须等生产完了才能发送结束信号
    worker2.join()
    queue.put(None)  # 这步必须写,告诉代理商别等了,生产完了!而且有几个消费者必须put几个None作为结束信号
    print('主进程:双十一大促销')
# 执行得到结果
工号: 10418 生产了 XiaoMi11 0
工号: 10418 生产了 XiaoMi11 1
用户: 10420 购买了 XiaoMi11 0
工号: 10419 生产了 XiaoMi11 0
工号: 10418 生产了 XiaoMi11 2
工号: 10418 生产了 XiaoMi11 3
用户: 10420 购买了 XiaoMi11 1
用户: 10420 购买了 XiaoMi11 0
用户: 10420 购买了 XiaoMi11 2
工号: 10419 生产了 XiaoMi11 1
工号: 10418 生产了 XiaoMi11 4
工号: 10419 生产了 XiaoMi11 2
用户: 10420 购买了 XiaoMi11 3
工号: 10419 生产了 XiaoMi11 3
工号: 10419 生产了 XiaoMi11 4
主进程:双十一大促销
用户: 10420 购买了 XiaoMi11 1
用户: 10420 购买了 XiaoMi11 4
用户: 10420 购买了 XiaoMi11 2
用户: 10420 购买了 XiaoMi11 3
用户: 10420 购买了 XiaoMi11 4

Process finished with exit code 0

上述方法解决了此前说的生产者、消费者之间的紧张关系,但是方法也不甚高明,只因纯手工put结束信号!

4.4、JoinableQueue

此方法允许队列中消费者通知生产者项目已被处理,这个通知用的是生产者和消费者之间共享的信号和条件实现!每当生产者往该队列中存入数据的时候,内部计数器+1,每当消费者处理完数据后调用task_done的时候,计算器-1!

4.4.1、语法

JoinableQueue([maxsize])

maxsize

是队列中允许最大item数,不填则默认不限制!

task_done()

消费者调用此方法通知中间商item已被处理,调用次数超过生产速度,会报错!

join()

生产者调用此方法阻塞队列,直到计算器为0的时候,才往后运行

import os
import random
import time
from multiprocessing import Process, Queue, JoinableQueue


def buy_phone(produce):
    while True:
        produce = queue.get()
        if not produce:  # 为空则终止循环
            break
        time.sleep(random.random())  # 模拟购物延迟
        print('用户:', os.getpid(), '购买了', produce)
        queue.task_done()  # 每处理一个数据就发送一次信号告知数据已被处理


def factory(pipline):
    for i in range(5):  # 阻塞队列(代理商)
        time.sleep(random.random())  # 模拟生产周期
        produce = f'XiaoMi11 {i}'
        pipline.put(produce)
        print('工号:', os.getpid(), '生产了', produce)
    pipline.join()  # 阻塞队列,直到task_done完毕


if __name__ == '__main__':
    queue = JoinableQueue()
    worker = Process(target=factory, args=(queue,))  # 作业工人
    worker2 = Process(target=factory, args=(queue,))  # 作业工人
    customer = Process(target=buy_phone, args=(queue,))
    customer2 = Process(target=buy_phone, args=(queue,))
    customer.daemon = True
    customer2.daemon = True
    start_list = [worker, worker2, customer, customer2]
    for i in start_list:
        i.start()

    worker.join()  # 生产完了肯定接收完了,所以这里不用写customer了
    worker2.join()
    print('主进程:双十一大促销')
    
# 执行得到结果
工号: 13544 生产了 XiaoMi11 0
工号: 13543 生产了 XiaoMi11 0
工号: 13543 生产了 XiaoMi11 1
工号: 13543 生产了 XiaoMi11 2
工号: 13544 生产了 XiaoMi11 1
用户: 13545 购买了 XiaoMi11 0
用户: 13546 购买了 XiaoMi11 0
用户: 13545 购买了 XiaoMi11 1
工号: 13544 生产了 XiaoMi11 2
用户: 13545 购买了 XiaoMi11 1
工号: 13543 生产了 XiaoMi11 3
工号: 13544 生产了 XiaoMi11 3
用户: 13546 购买了 XiaoMi11 2
用户: 13545 购买了 XiaoMi11 2
工号: 13543 生产了 XiaoMi11 4
工号: 13544 生产了 XiaoMi11 4
用户: 13545 购买了 XiaoMi11 3
用户: 13546 购买了 XiaoMi11 3
用户: 13545 购买了 XiaoMi11 4
用户: 13546 购买了 XiaoMi11 4
主进程:双十一大促销
世间微尘里 独爱茶酒中