应用场景
redis、Nginx、node.js、Tornado、Twisted这些使用IO多路复用技术的中间件或者框架:异步逻辑非常适合处理有Network IO且高并发的socket连接场景,因为这些场景往往需要等待IO。
应用举例
利用Redis实现动态添加任务
redis是一个具备极高性能的键值对非关系数据库。
并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。
为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。
①下载redis数据库zip版本
②解压缩,打开cmd进入redis目录
③输入如下命令,启用redis服务1
.\redis-server.exe redis.windows.conf
④打开cmd,进入redis目录输入如下命令打开redis-cli1
.\redis-cli.exe
⑤输入如下命令lpush1
2
3
4
5127.0.0.1:6379> lpush queue 5
(integer) 1
127.0.0.1:6379> lpush queue 3
(integer) 2
127.0.0.1:6379> lpush queue 1
运行代码示例:
loop_thread:单独的线程,运行着一个事件对象容器,用于实时接收新任务。consumer_thread:单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53import asyncio
import time
import redis
from threading import Thread
from queue import Queue
def start_loop(loop):
# 一个永远在后台运行的循环时间
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(n, q):
await asyncio.sleep(n)
q.put("ok")
def get_redis():
conn_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
return redis.Redis(connection_pool=conn_pool)
def consumer(con, q, loop):
# con:redis连接,q:队列对象
while True:
task = con.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), q), loop)
if __name__ == "__main__":
print(time.ctime())
new_loop = asyncio.get_event_loop()
# 定义线程,运行事件循环对象,用于实时接收新任务
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
# 创建redis连接
rcon = get_redis()
queue = Queue()
# 子线程:用于消费队列消息,并实时往事件对象容器添加新任务
consumer_thread = Thread(target=consumer, args=(rcon, queue, new_loop))
consumer_thread.setDaemon(True)
consumer_thread.start()
while True:
msg = queue.get()
print("协程运行完成..")
print("当前时间:", time.ctime())
输出结果如下:
Mon Mar 6 16:43:26 2023
协程运行完成..
当前时间: Mon Mar 6 16:43:27 2023
协程运行完成..
当前时间: Mon Mar 6 16:43:29 2023
协程运行完成..
当前时间: Mon Mar 6 16:43:31 2023
向Redis分别发起了5s,3s,1s的任务。 从结果来看,这三个任务,确实是并发执行的,1s的任务最先结束,三个任务完成总耗时5s