Hi All<br>I have a problem/doubt with a rabbit queue<br><br>I need that 25 threads consume one rabbit queue. <br><br>In this moment, I have an unique channel and consumer in a main thread, it consume the queue and save the content in a normal java queue. <br>
<br>The sons threads get the tasks from java queue, i want know how can I send one delay ack because my main thread can not follow consuming the rabbit queue if i does not send the ack inmediately. <br><br>When the task finishs correctly in a son thread, it saves the task in a second queue that main thread read and send the ack for this task to eliminate the task from rabbit queue. <br>
<br>Is it possible this?? or is it better that i have one syncronize channel and one syncronize consumer and that all threads consume the rabbit queue?<br><br>I show you the code that consume the queues<br><br> try{<br>
ConnectionFactory factory = new ConnectionFactory();<br><br> factory.setUsername(GesAgregaConstant.USER_RABBITMQ);<br> factory.setPassword(GesAgregaConstant.PASS_RABBITMQ);<br>
factory.setVirtualHost(GesAgregaConstant.VIRTUALHOST_RABBITMQ);<br> factory.setRequestedHeartbeat(0);<br> factory.setHost(GesAgregaConstant.SERVER_HOST_RABBITMQ);<br> factory.setPort(GesAgregaConstant.SERVER_PORT_RABBITMQ);<br>
<br> Connection conn = factory.newConnection();<br><br> channel = conn.createChannel();<br> String queueName = GesAgregaConstant.QUEUE_TAREAS_RABBITMQ;<br><br> channel.basicQos(1);<br>
boolean noAck = false;<br><br> QueueingConsumer consumer = new QueueingConsumer(channel);<br><br> channel.basicConsume(queueName, noAck, consumer);<br> boolean runInfinite = true;<br>
<br> while (runInfinite) {<br><br> QueueingConsumer.Delivery delivery;<br> try {<br><br> //TODO poner el ack de todos los elementos de la cola<br>
synchronized (qAck){<br> while(qAck.size()>0){<br> UrlControl cAck = qAck.remove();<br> System.out.println("Sale DE LA COLA" + cAck.getUrl() +"ACK " + cAck.getDelivery());<br>
channel.basicAck(cAck.getDelivery(), false);<br> }<br> }<br> <br> delivery = consumer.nextDelivery();<br>
<br> String url = new String(delivery.getBody());<br><br> System.out.println("CRAWLING " + url);<br> UrlControl cControl = new UrlControl(url,delivery.getEnvelope().getDeliveryTag());<br>
<br> q.push(cControl,0);<br> //q.push(new URL(url),0);<br> tc.startThreads(); <br><br> } catch (InterruptedException ie) {<br>
continue;<br> } /*catch (IOException e) {<br> continue;<br> }*/catch (Exception e) {<br> continue;<br> }<br>
}<br><br> channel.close();<br> conn.close();<br><br> }catch(IOException ioe){<br> ioe.printStackTrace();<br> }catch(Exception e){<br> e.printStackTrace();<br>
}<br><br><br>Thank you. <span style="border-collapse: separate; color: rgb(0, 0, 0); font-family: 'Times New Roman'; font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: normal; text-indent: 0px; text-transform: none; white-space: normal; word-spacing: 0px; font-size: medium;"><span style="border-collapse: collapse; font-family: Verdana,Arial,Helvetica,sans-serif; font-size: 13px;"><br>
Regards, <br>Isaias <br></span></span><br><br>