Python并发编程学习

开启子进程的两种方式

方式一:

1
2
3
4
5
6
7
8
9
10
11
12
import time
from multiprocessing import Process

def task(name):
print('{0} is running'.format(name))
time.sleep(3)
print('{0} is done'.format(name))

if __name__ == '__main__':
p = Process(target=task, args=('process001', )) # 向操作系统发送一个信号
p.start()
print('here first')

方式二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
from multiprocessing import Process

class MyProcess(Process):

def __init__(self, name):
super().__init__()
self.name = name

def run(self):
print('{0} is running.'.format(self.name))
time.sleep(3)
print('{0} is done.'.format(self.name))

if __name__ == '__main__':
p = MyProcess('process001')
p.start()
print('here first')

查看进程pid和ppid

  • pid为当前进程id
  • ppid为父进程id

使用os模块,os.getpid()os.getppid()

1
2
3
4
5
6
7
8
9
10
11
12
13
import os
import time
from multiprocessing import Process

def task():
print('{0} is running'.format(os.getpid()))
time.sleep(3)
print('{0} is done'.format(os.getpid()))

if __name__ == '__main__':
p = Process(target=task,) # 向操作系统发送一个信号
p.start()
print('here first', os.getpid())

Process对象的其它属性方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# join方法。先执行完子进程,最后执行主进程
p.start()
p.join() # 程序p执行完后再执行后面一行
print('main process run')

# 查看id号
p.pid()

# 进程是否存活
p.is_alive()

# 杀死进程(需要一段时间,因为这个命令只是给操作系统发送处理信号)
p.terminate()

# 进程名字
p.name

使用多进程实现并发连接

弊端是没有限制连接数量。

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import socket
from multiprocessing import Process

def talk(conn):
while True:
try:
data = conn.recv(1024)
if not data:
break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()

def server(ip, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, port))
s.listen(5)

while True:
conn, addr = s.accept()
p = Process(target=talk, args=(conn, ))
p.start()

if __name__ == '__main__':
server('127.0.0.1', 8080)

客户端

1
2
3
4
5
6
7
8
9
10
11
12
import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8080))

while True:
msg = input('>>: ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))

守护进程

当主进程运行结束后,子进程也跟着结束。

(运行下面的代码可以发现,task的进程还没有开始就被杀掉了。

因为主进程打印完`here first`就已经结束了,子进程也跟着一起结束)
1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from multiprocessing import Process

def task(name):
print('{0} is running'.format(name))
time.sleep(3)
print('{0} is done'.format(name))

if __name__ == '__main__':
p = Process(target=task, args=('process001', ))
p.daemon = True # 添加这个参数
p.start()
print('here first')

互斥锁

在一个父进程创建多个子进程的时候,为了保证子进程的运行顺序先后。

(这样效率会慢,但保证秩序)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Lock
import time

def task(name, mutex):
mutex.acquire() # 加锁
print(f'task{name} --> 001')
time.sleep(1)
print(f'task{name} --> 002')
time.sleep(1.5)
print(f'task{name} --> 003')
mutex.release() # 释放锁

if __name__ == '__main__':
mutex = Lock() # 只实例化一次,确保是同一把锁
for i in range(3):
p = Process(target=task, args=(str(i), mutex))
p.start()

比如一个抢票程序,就会用到互斥锁。多个进程访问的是同一块数据(票的余数)。

队列的使用(太tm实用了)

放内存里放数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Queue

q = Queue(3) # 限制了数据的数量,只能往内存里放三个数据

q.put('hello')
q.put({'a': 1})
q.put([2, 2, 2])

print(q.full()) # 查看队列是否已经满了

q.put(111) # 如果这个时候继续放数据进队列,则进程会卡住,等待队列出去一个数据

# 取出队列里的数据用get方法
q.get()
q.get()
q.get()

生产者消费者模型

生产者生产包子,放到queue队列容器中。

消费者往容器里取包子,互不影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# coding=utf-8

from multiprocessing import Process, Queue
import time

def producer(q):
for i in range(1, 11):
res = f'<包子{i}>'
time.sleep(0.5)
print(f'生产者生产了{res}')

q.put(res)

def consumer(q):
while True:
res = q.get()
if not res:
break
time.sleep(1.5)
print(f'消费者吃了{res}')

if __name__ == '__main__':
q = Queue()

p1 = Process(target=producer, args=(q,))

c1 = Process(target=consumer, args=(q,))

p1.start()
c1.start()

p1.join()
q.put(None) # 给予结束信号

print('主程序启动...')

总结:
1.程序中有两种角色

一类负责生产数据

一类负责处理数据

2.引入生产者消费者模型为了解决

平衡生产者和消费者之间的速度差

程序解开耦合

3.如何实现

生产者←→队列←→消费者

基于网络通信的消息队列:Rabbitmq

什么是线程

把操作系统比作一家公司,进程比作公司里的各个部门,而线程则是部门里的员工。

  • 每个进程至少需要一个线程(一个部门至少需要一个人)
  • 同一进程内的线程共享资源
  • 开启进程比开启线程的开销大

对于需要实现并发,且要共享数据的时候,就要用到多线程

如何开启线程

1.直接使用threading模块

1
2
3
4
5
6
7
8
9
10
11
12
import time
from threading import Thread

def task(name):
print(f'run task: {name}')
time.sleep(1)
print(f'task done: {name}')

if __name__ == '__main__':
t1 = Thread(target=task, args=('eat', ))
t1.start()
print('main task')

2.继承类的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
from threading import Thread

class MyThread(Thread):

def __init__(self, name):
super().__init__()
self.name = name

def run(self):
print('run task: {0}'.format(self.name))
time.sleep(1)
print('task done: {0}'.format(self.name))

if __name__ == '__main__':
t1 = MyThread('eat')
t1.start()
print('main task')

Thread对象的属性方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 建议还是使用import threading
from threading import Thread, currentThread, active_count, enumerate

# 查看当前线程名称 方法一
t = Thread()
t.getName()

# 查看当前线程名称 方法二
currentThread().getName()

# 修改线程名称
t.setName('线程1')
currentThread().setName('主线程1')

# 查看线程是否存活
t.isAlive()

# 主线程等待子线程执行完毕再运行
t.join()

# 激活的线程数量
print(active_count())

# 以列表的方式查看当前进程的所有线程
print(enumerate())

守护线程

和守护进程同理。当主线程运行结束后,子线程也会被强制结束掉。

例子一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
from threading import Thread

def task(name):
time.sleep(1)
print(f'run task: {name}')

if __name__ == '__main__':
t = Thread(target=task, args=('eat', ))
# t.setDaemon(True) 需要在t.start()之前指定
t.daemon = True
t.start()

print('main task')
print(t.is_alive())

例子二

这里和守护进程的区别是,主线程会等到没有设置daemon的子线程结束后,才会结束。

(aaa is over 没有被打印出来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
from threading import Thread

def aaa():
print('aaa')
time.sleep(3)
print('aaa is over')

def bbb():
print('bbb')
time.sleep(1)
print('bbb is over')

if __name__ == '__main__':
t1 = Thread(target=aaa)
t2 = Thread(target=bbb)

t1.daemon = True
t1.start()
t2.start()
print('this is main')

# 输出结果
"""
aaa
bbb
this is main
bbb is over
"""