[rabbitmq-discuss] multiple threads in one queue
Isaías
mcallus at gmail.com
Fri Sep 24 12:35:05 BST 2010
Hi All
I have a problem/doubt with a rabbit queue
I need that 25 threads consume one rabbit queue.
In this moment, I have an unique channel and consumer in a main thread, it
consume the queue and save the content in a normal java queue.
The sons threads get the tasks from java queue, i want know how can I send
one delay ack because my main thread can not follow consuming the rabbit
queue if i does not send the ack inmediately.
When the task finishs correctly in a son thread, it saves the task in a
second queue that main thread read and send the ack for this task to
eliminate the task from rabbit queue.
Is it possible this?? or is it better that i have one syncronize channel and
one syncronize consumer and that all threads consume the rabbit queue?
I show you the code that consume the queues
try{
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(GesAgregaConstant.USER_RABBITMQ);
factory.setPassword(GesAgregaConstant.PASS_RABBITMQ);
factory.setVirtualHost(GesAgregaConstant.VIRTUALHOST_RABBITMQ);
factory.setRequestedHeartbeat(0);
factory.setHost(GesAgregaConstant.SERVER_HOST_RABBITMQ);
factory.setPort(GesAgregaConstant.SERVER_PORT_RABBITMQ);
Connection conn = factory.newConnection();
channel = conn.createChannel();
String queueName = GesAgregaConstant.QUEUE_TAREAS_RABBITMQ;
channel.basicQos(1);
boolean noAck = false;
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, noAck, consumer);
boolean runInfinite = true;
while (runInfinite) {
QueueingConsumer.Delivery delivery;
try {
//TODO poner el ack de todos los elementos de la
cola
synchronized (qAck){
while(qAck.size()>0){
UrlControl cAck = qAck.remove();
System.out.println("Sale DE LA COLA" +
cAck.getUrl() +"ACK " + cAck.getDelivery());
channel.basicAck(cAck.getDelivery(), false);
}
}
delivery = consumer.nextDelivery();
String url = new String(delivery.getBody());
System.out.println("CRAWLING " + url);
UrlControl cControl = new
UrlControl(url,delivery.getEnvelope().getDeliveryTag());
q.push(cControl,0);
//q.push(new URL(url),0);
tc.startThreads();
} catch (InterruptedException ie) {
continue;
} /*catch (IOException e) {
continue;
}*/catch (Exception e) {
continue;
}
}
channel.close();
conn.close();
}catch(IOException ioe){
ioe.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
Thank you.
Regards,
Isaias
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100924/0dd888c0/attachment.htm>
More information about the rabbitmq-discuss
mailing list