并发编程

zhangly 2021-11-06 23:16:35
Categories: > Tags:

开启子进程的两种方式

方式一:

import time
from multiprocessing import Process

def task(name):
    print('{0} is running'.format(name))
    time.sleep(3)
    print('{0} is done'.format(name))

if __name__ == '__main__':
    p = Process(target=task, args=('process001', ))  # 向操作系统发送一个信号
    p.start()
    print('here first')

方式二:

import time
from multiprocessing import Process

class MyProcess(Process):

    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('{0} is running.'.format(self.name))
        time.sleep(3)
        print('{0} is done.'.format(self.name))

if __name__ == '__main__':
    p = MyProcess('process001')
    p.start()
    print('here first')

查看进程pid和ppid

使用os模块,os.getpid()os.getppid()

import os
import time
from multiprocessing import Process

def task():
    print('{0} is running'.format(os.getpid()))
    time.sleep(3)
    print('{0} is done'.format(os.getpid()))

if __name__ == '__main__':
    p = Process(target=task,)  # 向操作系统发送一个信号
    p.start()
    print('here first', os.getpid())

Process对象的其它属性方法

# join方法。先执行完子进程,最后执行主进程
p.start()
p.join() # 程序p执行完后再执行后面一行
print('main process run')

# 查看id号
p.pid()

# 进程是否存活
p.is_alive()

# 杀死进程(需要一段时间,因为这个命令只是给操作系统发送处理信号)
p.terminate()

# 进程名字
p.name

使用多进程实现并发连接

弊端是没有限制连接数量。

服务端

import socket
from multiprocessing import Process

def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:
                break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

def server(ip, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((ip, port))
    s.listen(5)

    while True:
        conn, addr = s.accept()
        p = Process(target=talk, args=(conn, ))
        p.start()

if __name__ == '__main__':
    server('127.0.0.1', 8080)

客户端

import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>: ').strip()
    if not msg:
        continue
    client.send(msg.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))

守护进程

当主进程运行结束后,子进程也跟着结束。

(运行下面的代码可以发现,task的进程还没有开始就被杀掉了。

因为主进程打印完`here first`就已经结束了,子进程也跟着一起结束)
import time
from multiprocessing import Process

def task(name):
    print('{0} is running'.format(name))
    time.sleep(3)
    print('{0} is done'.format(name))

if __name__ == '__main__':
    p = Process(target=task, args=('process001', ))
    p.daemon = True # 添加这个参数
    p.start()
    print('here first')

互斥锁

在一个父进程创建多个子进程的时候,为了保证子进程的运行顺序先后。

(这样效率会慢,但保证秩序)

from multiprocessing import Process, Lock
import time

def task(name, mutex):
    mutex.acquire()  # 加锁
    print(f'task{name} --> 001')
    time.sleep(1)
    print(f'task{name} --> 002')
    time.sleep(1.5)
    print(f'task{name} --> 003')
    mutex.release()  # 释放锁

if __name__ == '__main__':
    mutex = Lock()  # 只实例化一次,确保是同一把锁
    for i in range(3):
        p = Process(target=task, args=(str(i), mutex))
        p.start()

比如一个抢票程序,就会用到互斥锁。多个进程访问的是同一块数据(票的余数)。

队列的使用(太tm实用了)

放内存里放数据。

from multiprocessing import Queue

q = Queue(3)  # 限制了数据的数量,只能往内存里放三个数据

q.put('hello')
q.put({'a': 1})
q.put([2, 2, 2])

print(q.full())  # 查看队列是否已经满了

q.put(111)  # 如果这个时候继续放数据进队列,则进程会卡住,等待队列出去一个数据

# 取出队列里的数据用get方法
q.get()
q.get()
q.get()

生产者消费者模型

生产者生产包子,放到queue队列容器中。

消费者往容器里取包子,互不影响。

# coding=utf-8

from multiprocessing import Process, Queue
import time

def producer(q):
    for i in range(1, 11):
        res = f'<包子{i}>'
        time.sleep(0.5)
        print(f'生产者生产了{res}')

        q.put(res)

def consumer(q):
    while True:
        res = q.get()
        if not res:
            break
        time.sleep(1.5)
        print(f'消费者吃了{res}')

if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q,))

    c1 = Process(target=consumer, args=(q,))

    p1.start()
    c1.start()

    p1.join()
    q.put(None)  # 给予结束信号

    print('主程序启动...')

总结:
1.程序中有两种角色

一类负责生产数据

一类负责处理数据

2.引入生产者消费者模型为了解决

平衡生产者和消费者之间的速度差

程序解开耦合

3.如何实现

生产者←→队列←→消费者

基于网络通信的消息队列:Rabbitmq

什么是线程

把操作系统比作一家公司,进程比作公司里的各个部门,而线程则是部门里的员工。

对于需要实现并发,且要共享数据的时候,就要用到多线程

如何开启线程

1.直接使用threading模块

import time
from threading import Thread

def task(name):
    print(f'run task: {name}')
    time.sleep(1)
    print(f'task done: {name}')

if __name__ == '__main__':
    t1 = Thread(target=task, args=('eat', ))
    t1.start()
    print('main task')

2.继承类的方式

import time
from threading import Thread

class MyThread(Thread):

    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('run task: {0}'.format(self.name))
        time.sleep(1)
        print('task done: {0}'.format(self.name))

if __name__ == '__main__':
    t1 = MyThread('eat')
    t1.start()
    print('main task')

Thread对象的属性方法

# 建议还是使用import threading
from threading import Thread, currentThread, active_count, enumerate

# 查看当前线程名称 方法一
t = Thread()
t.getName()

# 查看当前线程名称 方法二
currentThread().getName()

# 修改线程名称
t.setName('线程1')
currentThread().setName('主线程1')

# 查看线程是否存活
t.isAlive()

# 主线程等待子线程执行完毕再运行
t.join()

# 激活的线程数量
print(active_count())

# 以列表的方式查看当前进程的所有线程
print(enumerate())

守护线程

和守护进程同理。当主线程运行结束后,子线程也会被强制结束掉。

例子一

import time
from threading import Thread

def task(name):
    time.sleep(1)
    print(f'run task: {name}')

if __name__ == '__main__':
    t = Thread(target=task, args=('eat', ))
    # t.setDaemon(True)  需要在t.start()之前指定
    t.daemon = True
    t.start()

    print('main task')
    print(t.is_alive())

例子二

这里和守护进程的区别是,主线程会等到没有设置daemon的子线程结束后,才会结束。

(aaa is over 没有被打印出来)

import time
from threading import Thread

def aaa():
    print('aaa')
    time.sleep(3)
    print('aaa is over')

def bbb():
    print('bbb')
    time.sleep(1)
    print('bbb is over')

if __name__ == '__main__':
    t1 = Thread(target=aaa)
    t2 = Thread(target=bbb)

    t1.daemon = True
    t1.start()
    t2.start()
    print('this is main')

# 输出结果
"""
aaa
bbb
this is main
bbb is over
"""