[rabbitmq-discuss] multiple threads in one queue

Isaías mcallus at gmail.com
Fri Sep 24 12:35:05 BST 2010


Hi All
I have a problem/doubt with a rabbit queue

I need that 25 threads consume one rabbit queue.

In this moment, I have an unique channel and consumer in a main thread, it
consume the queue and save the content in a normal java queue.

The sons threads get the tasks from java queue, i want know how can I send
one delay ack because my main thread can not follow consuming the rabbit
queue if i does not send the ack inmediately.

When the task finishs correctly in a son thread, it saves the task in a
second queue that main thread read and send the ack for this task to
eliminate the task from rabbit queue.

Is it possible this?? or is it better that i have one syncronize channel and
one syncronize consumer and that all threads consume the rabbit queue?

I show you the code that consume the queues

         try{
                ConnectionFactory factory = new ConnectionFactory();

                factory.setUsername(GesAgregaConstant.USER_RABBITMQ);
                factory.setPassword(GesAgregaConstant.PASS_RABBITMQ);

factory.setVirtualHost(GesAgregaConstant.VIRTUALHOST_RABBITMQ);
                factory.setRequestedHeartbeat(0);
                factory.setHost(GesAgregaConstant.SERVER_HOST_RABBITMQ);
                factory.setPort(GesAgregaConstant.SERVER_PORT_RABBITMQ);

                Connection conn = factory.newConnection();

                channel = conn.createChannel();
                String queueName = GesAgregaConstant.QUEUE_TAREAS_RABBITMQ;

                channel.basicQos(1);
                boolean noAck = false;

                QueueingConsumer consumer = new QueueingConsumer(channel);

                channel.basicConsume(queueName, noAck, consumer);
                boolean runInfinite = true;

                while (runInfinite) {

                    QueueingConsumer.Delivery delivery;
                    try {

                        //TODO poner el ack de todos los elementos de la
cola
                        synchronized (qAck){
                            while(qAck.size()>0){
                               UrlControl cAck = qAck.remove();
                               System.out.println("Sale DE LA COLA" +
cAck.getUrl() +"ACK " + cAck.getDelivery());
                               channel.basicAck(cAck.getDelivery(), false);
                            }
                        }

                        delivery = consumer.nextDelivery();

                        String url = new String(delivery.getBody());

                        System.out.println("CRAWLING " + url);
                        UrlControl cControl = new
UrlControl(url,delivery.getEnvelope().getDeliveryTag());

                        q.push(cControl,0);
                        //q.push(new URL(url),0);
                        tc.startThreads();

                    } catch (InterruptedException ie) {
                        continue;
                    } /*catch (IOException e) {
                        continue;
                    }*/catch (Exception e) {
                        continue;
                    }
                }

                channel.close();
                conn.close();

            }catch(IOException ioe){
                ioe.printStackTrace();
            }catch(Exception e){
                e.printStackTrace();
            }


Thank you.
Regards,
Isaias
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100924/0dd888c0/attachment.htm>


More information about the rabbitmq-discuss mailing list