Consuming from rabbitmq using Celery -


i using celery rabbitmq broker on server a. tasks require interaction server say, server b , using rabbitmq queues interaction.

queue 1 - server (producer), server b (consumer)

queue 2 - server b (producer), server (consumer)

my celery unexpectedly hanging , have found reason incorrect implementation of server consumer code.

channel.start_consuming() keeps polling rabbitmq expected putting in celery task creates multiple pollers don't expire. can add expiry time completion data being sent server b cannot guaranteed. code pasted below 1 method used tackle issue not convinced best solution.

i wish know doing wrong , right way implement because have failed searching articles on web. tips, insights , links articles extremely helpful.

finally, code -

@celery.task def task_a(data):     do_some_processing     # create 1 rabbitmq consumer instance avoid celery hangups     task_d.delay()  @celery.task def task_b(data):     do_some_processing     if data not none:         task_c.delay()  @celery.task def task_c():     data = some_data     data = json.dumps(data)     conn_params = pika.connectionparameters(host=rabbitmq_host)     connection = pika.blockingconnection(conn_params)     channel = connection.channel()     channel.queue_declare(queue=queue_1)     channel.basic_publish(exchange='',                         routing_key=queue_1,                         body=data)     channel.close()  @celery.task def task_d():     def queue_helper(ch, method, properties, body):         '''         callback queue.         '''         data = json.loads(body)         task_b.delay(data)      conn_params = pika.connectionparameters(host=rabbitmq_host)     connection = pika.blockingconnection(conn_params)     channel = connection.channel()     channel.queue_declare(queue=queue_2)     channel.basic_consume(queue_helper,                         queue=queue_2,                         no_ack=true)     channel.start_consuming()     channel.close() 


Comments

Popular posts from this blog

javascript - Chart.js (Radar Chart) different scaleLineColor for each scaleLine -

apache - Error with PHP mail(): Multiple or malformed newlines found in additional_header -

java - Android – MapFragment overlay button shadow, just like MyLocation button -