例如:
def add(x,y):
print(f'{x} + {y} = {x + y}')
return x + y
修改你的项目根目录下的自动生成的distributed_frame_config.py配置文件的方式来进行redis rabbitmq等的配置
# 1.2.1 导入生成各种不同类型中间件的consumer的工厂函数。
from function_scheduling_distributed_framework import get_consumer,patch_frame_config,ConsumersManager
# 1.2.3 实例化生成一个消费者,这里生成的是redis为中间件,可确认消费的消费者。更多的函数运行控制见readme的说明。
consumer_add = get_consumer('queue_test569', consuming_function=add, broker_kind=9)
# 1.2.4 开启并发消费,默认是在一个新的线程里面开始循环调度任务消费,所以当前主线程下,
# 可以在当前主线程连续对多个consumer实例执行start_consuming_message方法,同时消费多个队列和执行相应的函数。
ConsumersManager.show_all_consumer_info() # 这行是显示所有注册的消费者信息,可以不执行。
consumer_add.start_consuming_message()
# 1.2.5 发布一个任务到中间件,一般推送是写在另外的脚本。这里写在一起。
consumer_add.publisher_of_same_queue.publish(dict(x=1,y=2))
# 1.2.5.2 发布一个任务到中间件,并且使用同步的方式等待结果返回。
async_result = consumer_add.publisher_of_same_queue.publish(dict(x=3,y=4),
independence_control_config=PriorityConsumingControlConfig(is_using_rpc_mode=True))
print(async_result.result)
装饰器版,使用方式例如:
'''
from function_scheduling_distributed_framework import boost
@boost('queue_test_f01', qps=0.2, broker_kind=2)
def add(a, b):
print(a + b)
for i in range(10, 20):
add.push(i, b=i * 2) # 使用add.pub 发布任务
add.consume() # 使用add.consume 消费任务
'''
对比常规方式,常规方式使用方式如下
'''
def add(a, b):
print(a + b)
# 需要手动指定consuming_function入参的值。
consumer = get_consumer('queue_test_f01', consuming_function=add,qps=0.2, broker_kind=2)
for i in range(10, 20):
consumer.publisher_of_same_queue.publish(dict(a=i, b=i * 2))
consumer.start_consuming_message()
'''
装饰器版本的 boost 入参 和 get_consumer 入参99%一致,唯一不同的是 装饰器版本加在了函数上自动知道消费函数了,
所以不需要传consuming_function参数。
ssh://root@112.90.89.16:10033/data/miniconda3dir/inner/envs/mtfy/bin/python -u /home/ydf/distributed_framework/test_frame/my/test.py
17:32:59 "/home/ydf/distributed_framework/function_scheduling_distributed_framework/utils/decorators.py:26" 操作系统类型是 posix
2020-01-02 17:32:59 - ConcurrentModeDispatcher - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:548" - __init__ - WARNING - 队列为 queue_test569 函数为 <function add at 0x7f762ac56e18> 的消费者 设置并发模式为thread
17:32:59 "/home/ydf/distributed_framework/test_frame/my/test.py:27" 此行 实例化队列名 queue_test569 的消费者, 类型为 <class 'function_scheduling_distributed_framework.consumers.redis_consumer_ack_able.RedisConsumerAckAble'>
17:32:59 "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:168" 当前解释器内,所有消费者的信息是:
{
"queue_test569": {
"is_using_rpc_mode": false,
"function_result_status_persistance_conf": {
"is_save_status": false,
"is_save_result": false,
"expire_seconds": 604800
},
"schedule_tasks_on_main_thread": false,
"do_not_run_by_specify_time": [
"10:00:00",
"22:00:00"
],
"is_do_not_run_by_specify_time_effect": false,
"is_consuming_function_use_multi_params": true,
"task_filtering_expire_seconds": 0,
"do_task_filtering": false,
"create_logger_file": true,
"logger_prefix": "",
"msg_expire_senconds": 0,
"qps": 0,
"msg_schedule_time_intercal": 0.0,
"is_print_detail_exception": true,
"log_level": 10,
"max_retry_times": 3,
"concurrent_mode": 1,
"specify_concurrent_pool": null,
"threads_num": 50,
"function_timeout": 0,
"consuming_function": "<function add at 0x7f762ac56e18>",
"queue_name": "queue_test569",
"broker_kind": 9,
"class_name": "RedisConsumerAckAble",
"concurrent_mode_name": "thread",
"where_to_instantiate": "/home/ydf/distributed_framework/test_frame/my/test.py:27"
}
}
17:32:59 "/home/ydf/distributed_framework/test_frame/my/test.py:27" queue_test569 的消费者
2020-01-02 17:32:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:411" - start_consuming_message - WARNING - 开始消费 queue_test569 中的消息
2020-01-02 17:32:59 - RedisPublisher--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/publishers/base_publisher.py:92" - __init__ - INFO - <class 'function_scheduling_distributed_framework.publishers.redis_publisher.RedisPublisher'> 被实例化了
2020-01-02 17:32:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:495" - check_heartbeat_and_message_count - INFO - [queue_test569] 队列中还有 [0] 个任务
2020-01-02 17:32:59 - RedisPublisher--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/publishers/base_publisher.py:122" - publish - DEBUG - 向queue_test569 队列,推送消息 耗时0.0009秒 {'x': 1, 'y': 2, 'extra': {'task_id': 'queue_test569_result:9f9a3fa5-5682-44f4-b4ef-a91274435bef', 'publish_time': 1577957579.8357, 'publish_time_format': '2020-01-02 17:32:59'}}
2020-01-02 17:32:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/redis_consumer_ack_able.py:25" - _shedual_task - DEBUG - 从redis的 [queue_test569] 队列中 取出的消息是: {"x": 1, "y": 2, "extra": {"task_id": "queue_test569_result:9f9a3fa5-5682-44f4-b4ef-a91274435bef", "publish_time": 1577957579.8357, "publish_time_format": "2020-01-02 17:32:59"}}
2020-01-02 17:32:59 - RedisPublisher--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/publishers/base_publisher.py:122" - publish - DEBUG - 向queue_test569 队列,推送消息 耗时0.0002秒 {'x': 3, 'y': 4, 'extra': {'task_id': 'queue_test569_result:af9f710f-9a2c-44d4-b60d-b65a335818dd', 'publish_time': 1577957579.8371, 'publish_time_format': '2020-01-02 17:32:59', 'function_timeout': None, 'max_retry_times': None, 'is_print_detail_exception': None, 'msg_expire_senconds': None, 'is_using_rpc_mode': True}}
2020-01-02 17:32:59 - FunctionResultChche - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/utils/decorators.py:420" - __cached_function_result_for_a_time - DEBUG - 函数 [_judge_is_daylight] 此次不能使用缓存
2020-01-02 17:32:59 - CustomThreadPoolExecutor - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/concurrent_pool/custom_threadpool_executor.py:97" - _adjust_thread_count - DEBUG - (0, 0, 0, 5)
2020-01-02 17:32:59 - _CustomThread - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/concurrent_pool/custom_threadpool_executor.py:140" - run - DEBUG - 新启动线程 140145262544640
17:32:59 "/home/ydf/distributed_framework/test_frame/my/test.py:11" 1 + 2 = 3
2020-01-02 17:32:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:468" - _run_consuming_function_with_confirm_and_retry - DEBUG - 函数 add 第1次 运行, 正确了,函数运行时间是 0.0002 秒,入参是 【 {'x': 1, 'y': 2} 】。 [<_CustomThread(Thread-5, started daemon 140145262544640)> 6]
2020-01-02 17:32:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/redis_consumer_ack_able.py:25" - _shedual_task - DEBUG - 从redis的 [queue_test569] 队列中 取出的消息是: {"x": 3, "y": 4, "extra": {"task_id": "queue_test569_result:af9f710f-9a2c-44d4-b60d-b65a335818dd", "publish_time": 1577957579.8371, "publish_time_format": "2020-01-02 17:32:59", "function_timeout": null, "max_retry_times": null, "is_print_detail_exception": null, "msg_expire_senconds": null, "is_using_rpc_mode": true}}
2020-01-02 17:32:59 - CustomThreadPoolExecutor - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/concurrent_pool/custom_threadpool_executor.py:97" - _adjust_thread_count - DEBUG - (1, 1, 1, 6)
17:32:59 "/home/ydf/distributed_framework/test_frame/my/test.py:11" 3 + 4 = 7
2020-01-02 17:32:59 - _CustomThread - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/concurrent_pool/custom_threadpool_executor.py:140" - run - DEBUG - 新启动线程 140145261491968
2020-01-02 17:32:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:468" - _run_consuming_function_with_confirm_and_retry - DEBUG - 函数 add 第1次 运行, 正确了,函数运行时间是 0.0004 秒,入参是 【 {'x': 3, 'y': 4} 】。 [<_CustomThread(Thread-5, started daemon 140145262544640)> 7]
17:32:59 "/home/ydf/distributed_framework/test_frame/my/test.py:40" 7
2020-01-02 17:33:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:495" - check_heartbeat_and_message_count - INFO - [queue_test569] 队列中还有 [0] 个任务
2020-01-02 17:34:59 - RedisConsumerAckAble--thread--queue_test569 - "/home/ydf/distributed_framework/function_scheduling_distributed_framework/consumers/base_consumer.py:495" - check_heartbeat_and_message_count - INFO - [queue_test569] 队列中还有 [0] 个任务