多线程队列实现方式 funboost框架

def start_consuming_message(queue_name, consume_function, threads_num=50):
    pool = ThreadPoolExecutor(threads_num)
    while True:
        try:
            redis_task = redis.brpop(queue_name, timeout=60)
            if redis_task:
                task_str = redis_task[1].decode()
                print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
                pool.submit(consume_function, **json.loads(task_str))
            else:
                print(f'redis的 {queue_name} 队列中没有任务')
        except redis.RedisError as e:
            print(e)


if __name__ == '__main__':
    import time


    def add(x, y):
        time.sleep(5)
        print(f'{x} + {y} 的结果是 {x + y}')

    # 推送任务
    for i in range(100):
        print(i)
        redis.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))


    start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)
搜索