[rabbitmq-discuss] How do the Producer gets response from the subscribers
ramkkk
mramu789 at gmail.com
Thu Sep 19 13:15:28 BST 2013
thanks Alvaro Videla-2,
i tried the way u suggested ,
Producer the sends a command to the consumer to spawn the process . so each
consumer has to give the response back to the Producer ..
but i am not getting the response back to the Producer, consumer is going
on the infinite loop...
here is the my code ..
consumer code
=========================
import pika
import uuid
import subprocess
import sys
import copy
import os
class Consumer(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.spawned_processes = []
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='logs',
type='fanout')
result = self.channel.queue_declare(exclusive=True)
self.queue_name = result.method.queue
self.channel.queue_bind(exchange='logs',
queue=self.queue_name)
def build_env_param_for_child (self):
sys.path = [path.encode('ascii','ignore') for path in sys.path]
python_path = (':', ';')[sys.platform == 'win32'].join(filter(lambda
dr:bool(dr), sys.path))
child_proc_env = copy(os.environ)
child_proc_env.update({'PYTHONPATH':python_path[1:]})
return child_proc_env
def start(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.queue_name)
print " [x] Awaiting RPC requests"
self.channel.start_consuming()
def spawn_processes(self,number, *args, **kwargs):
for n in range(0,number):
process = subprocess.Popen('./bin/start_worker')
print "Spawning %s with pid %s " %
('./bin/start_worker',process.pid)
self.spawned_processes.append(process)
def on_request(self,ch, method, props, body):
self.spawn_processes(2)
self.response = [process.pid for process in self.spawned_processes]
ch.basic_publish(exchange='logs',
routing_key='',
properties=pika.BasicProperties(correlation_id = \
props.correlation_id,
delivery_mode=2 ,
),
body=str(self.response))
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__=='__main__':
consumer=Consumer()
consumer.start()
and
Producer
===============================================
import pika
import uuid
import argparse
class Producer(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='logs',
type='fanout')
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='logs',
routing_key='',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
delivery_mode=2,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return self.response
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num', default=2)
args = vars(parser.parse_args())
num=int(args['num'])
producer = Producer()
print " [x] Requesting fib(1)"
response = producer.call(num)
print " [.] Got %r" % (response,)
please suggest where i went wrong ..
--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/How-do-the-Producer-gets-response-from-the-subscribers-tp29765p29775.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list