多线程队列实现方式 funboost框架
def start_consuming_message(queue_name, consume_function, threads_num=50):
pool = ThreadPoolExecutor(threads_num)
while True:
try:
redis_task = redis.brpop(queue_name, timeout=60)
if redis_task:
task_str = redis_task[1].decode()
print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
pool.submit(consume_function, **json.loads(task_str))
else:
print(f'redis的 {queue_name} 队列中没有任务')
except redis.RedisError as e:
print(e)
if __name__ == '__main__':
import time
def add(x, y):
time.sleep(5)
print(f'{x} + {y} 的结果是 {x + y}')
# 推送任务
for i in range(100):
print(i)
redis.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))
start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)