[rabbitmq-discuss] Creating a job only once (using Java Client API)
Allan Kamau
kamauallan at gmail.com
Sat Apr 16 13:03:20 BST 2011
I have potentially many clients what may attempt create a given type
of job at the same time and more than once. And I would like the given
job (maybe identified by a job_name) for the given queue to be created
only once. And for the given job to be or have been executed
successfully only once in the future, present or in the past.
This is the long format of the situation.
I have several clients for whom a particular initialization step needs
to happen only once. Each such client represents an instance of an
application I am executing in parallel, for a particular node I would
like some data initialization procedure to be run only once triggered
by a single instance of these processes even though there will be many
such instances (processes) started at the same time. After the task
has been created a note that such a task as already been created
(already run or not) should be evident for all instances of this
application present and even for the instances to be created in the
future accessing a given queue_name.
Below is what I have tried without much success.
public void runHelper()throws IOException,java.lang.InterruptedException
{
long start_time=System.currentTimeMillis();
long duration=0;
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(queue_name,false,false,false,null);
String message=null;
QueueingConsumer consumer=new QueueingConsumer(channel);
boolean autoAck=false;
channel.basicConsume(queue_name,autoAck,consumer);
int messages_cnt=0;
for(int i=1;i<=number_of_messages;i++)
{
QueueingConsumer.Delivery
delivery=consumer.nextDelivery(1000);//consumer.nextDelivery();
if(delivery==null)
{
//this means no message has been send received for the given
channel and queue.
//so let's send one.
message="dothis, from \""+_id__producer+"\", id:"+i;
channel.basicPublish("",queue_name,null,message.getBytes());
number_of_jobs_posted++;
}
else
{
//consume the message
messages_cnt+=1;
String message2=new String(delivery.getBody());
if(message2.startsWith("dothis"))
{
System.out.println(new java.util.Date()+",
_id__producer:\""+_id__producer+"\", got the job to be executed only
once:\""+message2+"\"");
message2="donethat";
channel.basicPublish("",queue_name,null,message2.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
number_of_jobs_received++;
}
else
{
//do not acknowledge the receiption of the message.
}
System.out.println(new java.util.Date()+" -
_id__producer:\""+_id__producer+"\", messages_cnt:"+messages_cnt+",
[x]Received:'"+message2+"'. "+new java.util.Date());
}
}
duration=System.currentTimeMillis()-start_time;
System.out.println("[x] Thread:\""+_id__producer+"\" completed
sending:"+number_of_messages+", messages in:"+duration+"ms.");
channel.close();
connection.close();
}
More information about the rabbitmq-discuss
mailing list