Consuming from rabbitmq using Celery -
i using celery rabbitmq broker on server a. tasks require interaction server say, server b , using rabbitmq queues interaction.
queue 1
- server (producer), server b (consumer)
queue 2
- server b (producer), server (consumer)
my celery unexpectedly hanging , have found reason incorrect implementation of server consumer code.
channel.start_consuming()
keeps polling rabbitmq expected putting in celery task creates multiple pollers don't expire. can add expiry time completion data
being sent server b cannot guaranteed. code pasted below 1 method used tackle issue not convinced best solution.
i wish know doing wrong , right way implement because have failed searching articles on web. tips, insights , links articles extremely helpful.
finally, code -
@celery.task def task_a(data): do_some_processing # create 1 rabbitmq consumer instance avoid celery hangups task_d.delay() @celery.task def task_b(data): do_some_processing if data not none: task_c.delay() @celery.task def task_c(): data = some_data data = json.dumps(data) conn_params = pika.connectionparameters(host=rabbitmq_host) connection = pika.blockingconnection(conn_params) channel = connection.channel() channel.queue_declare(queue=queue_1) channel.basic_publish(exchange='', routing_key=queue_1, body=data) channel.close() @celery.task def task_d(): def queue_helper(ch, method, properties, body): ''' callback queue. ''' data = json.loads(body) task_b.delay(data) conn_params = pika.connectionparameters(host=rabbitmq_host) connection = pika.blockingconnection(conn_params) channel = connection.channel() channel.queue_declare(queue=queue_2) channel.basic_consume(queue_helper, queue=queue_2, no_ack=true) channel.start_consuming() channel.close()
Comments
Post a Comment