Python并发编程学习
发表于更新于
字数总计:1.7k阅读时长:7分钟阅读量: 长沙
开启子进程的两种方式
方式一:
1 2 3 4 5 6 7 8 9 10 11 12
| import time from multiprocessing import Process
def task(name): print('{0} is running'.format(name)) time.sleep(3) print('{0} is done'.format(name))
if __name__ == '__main__': p = Process(target=task, args=('process001', )) p.start() print('here first')
|
方式二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import time from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name): super().__init__() self.name = name
def run(self): print('{0} is running.'.format(self.name)) time.sleep(3) print('{0} is done.'.format(self.name))
if __name__ == '__main__': p = MyProcess('process001') p.start() print('here first')
|
查看进程pid和ppid
使用os模块,os.getpid()
和os.getppid()
1 2 3 4 5 6 7 8 9 10 11 12 13
| import os import time from multiprocessing import Process
def task(): print('{0} is running'.format(os.getpid())) time.sleep(3) print('{0} is done'.format(os.getpid()))
if __name__ == '__main__': p = Process(target=task,) p.start() print('here first', os.getpid())
|
Process对象的其它属性方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| p.start() p.join() print('main process run')
p.pid()
p.is_alive()
p.terminate()
p.name
|
使用多进程实现并发连接
弊端是没有限制连接数量。
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import socket from multiprocessing import Process
def talk(conn): while True: try: data = conn.recv(1024) if not data: break conn.send(data.upper()) except ConnectionResetError: break conn.close()
def server(ip, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((ip, port)) s.listen(5)
while True: conn, addr = s.accept() p = Process(target=talk, args=(conn, )) p.start()
if __name__ == '__main__': server('127.0.0.1', 8080)
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12
| import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', 8080))
while True: msg = input('>>: ').strip() if not msg: continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8'))
|
守护进程
当主进程运行结束后,子进程也跟着结束。
(运行下面的代码可以发现,task的进程还没有开始就被杀掉了。
因为主进程打印完`here first`就已经结束了,子进程也跟着一起结束)
1 2 3 4 5 6 7 8 9 10 11 12 13
| import time from multiprocessing import Process
def task(name): print('{0} is running'.format(name)) time.sleep(3) print('{0} is done'.format(name))
if __name__ == '__main__': p = Process(target=task, args=('process001', )) p.daemon = True p.start() print('here first')
|
互斥锁
在一个父进程创建多个子进程的时候,为了保证子进程的运行顺序先后。
(这样效率会慢,但保证秩序)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from multiprocessing import Process, Lock import time
def task(name, mutex): mutex.acquire() print(f'task{name} --> 001') time.sleep(1) print(f'task{name} --> 002') time.sleep(1.5) print(f'task{name} --> 003') mutex.release()
if __name__ == '__main__': mutex = Lock() for i in range(3): p = Process(target=task, args=(str(i), mutex)) p.start()
|
比如一个抢票程序,就会用到互斥锁。多个进程访问的是同一块数据(票的余数)。
队列的使用(太tm实用了)
放内存里放数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from multiprocessing import Queue
q = Queue(3)
q.put('hello') q.put({'a': 1}) q.put([2, 2, 2])
print(q.full())
q.put(111)
q.get() q.get() q.get()
|
生产者消费者模型
生产者生产包子,放到queue队列容器中。
消费者往容器里取包子,互不影响。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
from multiprocessing import Process, Queue import time
def producer(q): for i in range(1, 11): res = f'<包子{i}>' time.sleep(0.5) print(f'生产者生产了{res}')
q.put(res)
def consumer(q): while True: res = q.get() if not res: break time.sleep(1.5) print(f'消费者吃了{res}')
if __name__ == '__main__': q = Queue()
p1 = Process(target=producer, args=(q,))
c1 = Process(target=consumer, args=(q,))
p1.start() c1.start()
p1.join() q.put(None)
print('主程序启动...')
|
总结:
1.程序中有两种角色
一类负责生产数据
一类负责处理数据
2.引入生产者消费者模型为了解决
平衡生产者和消费者之间的速度差
程序解开耦合
3.如何实现
生产者←→队列←→消费者
基于网络通信的消息队列:Rabbitmq
什么是线程
把操作系统比作一家公司,进程比作公司里的各个部门,而线程则是部门里的员工。
- 每个进程至少需要一个线程(一个部门至少需要一个人)
- 同一进程内的线程共享资源
- 开启进程比开启线程的开销大
对于需要实现并发,且要共享数据的时候,就要用到多线程。
如何开启线程
1.直接使用threading
模块
1 2 3 4 5 6 7 8 9 10 11 12
| import time from threading import Thread
def task(name): print(f'run task: {name}') time.sleep(1) print(f'task done: {name}')
if __name__ == '__main__': t1 = Thread(target=task, args=('eat', )) t1.start() print('main task')
|
2.继承类的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import time from threading import Thread
class MyThread(Thread):
def __init__(self, name): super().__init__() self.name = name
def run(self): print('run task: {0}'.format(self.name)) time.sleep(1) print('task done: {0}'.format(self.name))
if __name__ == '__main__': t1 = MyThread('eat') t1.start() print('main task')
|
Thread对象的属性方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| from threading import Thread, currentThread, active_count, enumerate
t = Thread() t.getName()
currentThread().getName()
t.setName('线程1') currentThread().setName('主线程1')
t.isAlive()
t.join()
print(active_count())
print(enumerate())
|
守护线程
和守护进程同理。当主线程运行结束后,子线程也会被强制结束掉。
例子一:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import time from threading import Thread
def task(name): time.sleep(1) print(f'run task: {name}')
if __name__ == '__main__': t = Thread(target=task, args=('eat', )) t.daemon = True t.start()
print('main task') print(t.is_alive())
|
例子二:
这里和守护进程的区别是,主线程会等到没有设置daemon
的子线程结束后,才会结束。
(aaa is over 没有被打印出来)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import time from threading import Thread
def aaa(): print('aaa') time.sleep(3) print('aaa is over')
def bbb(): print('bbb') time.sleep(1) print('bbb is over')
if __name__ == '__main__': t1 = Thread(target=aaa) t2 = Thread(target=bbb)
t1.daemon = True t1.start() t2.start() print('this is main')
""" aaa bbb this is main bbb is over """
|