[rabbitmq-discuss] How do the Producer gets response from the subscribers

ramkkk mramu789 at gmail.com
Thu Sep 19 13:15:28 BST 2013


thanks Alvaro Videla-2,


i tried the way u suggested ,


Producer the sends a command to the consumer  to spawn the process . so each
consumer has to give the response back to the Producer ..

 but i am not getting the response back to the Producer, consumer is going
on the infinite loop...


here is the my code .. 

consumer code 
=========================
import pika
import uuid
import subprocess
import sys
import copy
import  os
class Consumer(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
        self.spawned_processes = []
        self.channel = self.connection.channel()        
        self.channel.exchange_declare(exchange='logs',
                         type='fanout')
        result = self.channel.queue_declare(exclusive=True)
        self.queue_name = result.method.queue
        
        self.channel.queue_bind(exchange='logs',
                   queue=self.queue_name)        
        
    def build_env_param_for_child (self):        
        sys.path = [path.encode('ascii','ignore') for path in sys.path]        
        python_path = (':', ';')[sys.platform == 'win32'].join(filter(lambda
dr:bool(dr), sys.path))       
        child_proc_env = copy(os.environ)        
        child_proc_env.update({'PYTHONPATH':python_path[1:]})
        return child_proc_env
        
    def start(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.on_request, queue=self.queue_name)

        print " [x] Awaiting RPC requests"
        self.channel.start_consuming()
        
        
    def spawn_processes(self,number, *args, **kwargs):        
        for n in range(0,number):            
            process = subprocess.Popen('./bin/start_worker')
            print "Spawning %s with pid %s " %
('./bin/start_worker',process.pid)
            self.spawned_processes.append(process)

    def on_request(self,ch, method, props, body):        
        self.spawn_processes(2)
        self.response = [process.pid for process in self.spawned_processes]       
        ch.basic_publish(exchange='logs',
                     routing_key='',
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id,                                                          
                                                     delivery_mode=2 ,
                                                     ),          
                                                     
                     body=str(self.response))
        ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__=='__main__':
    consumer=Consumer()
    consumer.start()



and

Producer
===============================================
import pika
import uuid
import argparse

class Producer(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        self.channel = self.connection.channel()        
        self.channel.exchange_declare(exchange='logs',
                         type='fanout')
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue      

        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body): 
        self.response = body           


    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())        
        self.channel.basic_publish(exchange='logs',
                                   routing_key='',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,   
                                         delivery_mode=2,
                                         ),
                                   body=str(n))
        
        while self.response is None:
            self.connection.process_data_events()
        return self.response

if __name__ == '__main__': 
    parser = argparse.ArgumentParser()
    parser.add_argument('--num', default=2)
    args = vars(parser.parse_args())
    num=int(args['num'])
    producer = Producer()
    print " [x] Requesting fib(1)"
    response = producer.call(num)
    print " [.] Got %r" % (response,)


please suggest where i went wrong ..







--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/How-do-the-Producer-gets-response-from-the-subscribers-tp29765p29775.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list