[rabbitmq-discuss] Creating a job only once (using Java Client API)

Allan Kamau kamauallan at gmail.com
Sat Apr 16 15:18:36 BST 2011

> On Apr 16, 2011, at 7:03 AM, Allan Kamau wrote:
>> I have potentially many clients what may attempt create a given type
>> of job at the same time and more than once. And I would like the given
>> job (maybe identified by a job_name) for the given queue to be created
>> only once. And for the given job to be or have been executed
>> successfully only once in the future, present or in the past.
>> This is the long format of the situation.
>> I have several clients for whom a particular initialization step needs
>> to happen only once. Each such client represents an instance of an
>> application I am executing in parallel, for a particular node I would
>> like some data initialization procedure to be run only once triggered
>> by a single instance of these processes even though there will be many
>> such instances (processes) started at the same time. After the task
>> has been created a note that such a task as already been created
>> (already run or not) should be evident for all instances of this
>> application present and even for the instances to be created in the
>> future accessing a given queue_name.
>> Below is what I have tried without much success.
>>       public void runHelper()throws IOException,java.lang.InterruptedException
>>       {
>>               long start_time=System.currentTimeMillis();
>>               long duration=0;
>>               ConnectionFactory factory=new ConnectionFactory();
>>               factory.setHost("localhost");
>>               Connection connection=factory.newConnection();
>>               Channel channel=connection.createChannel();
>>               channel.queueDeclare(queue_name,false,false,false,null);
>>               String message=null;
>>               QueueingConsumer consumer=new QueueingConsumer(channel);
>>               boolean autoAck=false;
>>               channel.basicConsume(queue_name,autoAck,consumer);
>>               int messages_cnt=0;
>>               for(int i=1;i<=number_of_messages;i++)
>>               {
>>                       QueueingConsumer.Delivery
>> delivery=consumer.nextDelivery(1000);//consumer.nextDelivery();
>>                       if(delivery==null)
>>                       {
>>                               //this means no message has been send received for the given
>> channel and queue.
>>                               //so let's send one.
>>                               message="dothis, from \""+_id__producer+"\", id:"+i;
>>                               channel.basicPublish("",queue_name,null,message.getBytes());
>>                               number_of_jobs_posted++;
>>                       }
>>                       else
>>                       {
>>                               //consume the message
>>                               messages_cnt+=1;
>>                               String message2=new String(delivery.getBody());
>>                               if(message2.startsWith("dothis"))
>>                               {
>>                                       System.out.println(new java.util.Date()+",
>> _id__producer:\""+_id__producer+"\", got the job to be executed only
>> once:\""+message2+"\"");
>>                                       message2="donethat";
>>                                       channel.basicPublish("",queue_name,null,message2.getBytes());
>>                                       channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
>>                                       number_of_jobs_received++;
>>                               }
>>                               else
>>                               {
>>                                       //do not acknowledge the receiption of the message.
>>                               }
>>                               System.out.println(new java.util.Date()+" -
>> _id__producer:\""+_id__producer+"\", messages_cnt:"+messages_cnt+",
>> [x]Received:'"+message2+"'. "+new java.util.Date());
>>                       }
>>               }
>>               duration=System.currentTimeMillis()-start_time;
>>               System.out.println("[x] Thread:\""+_id__producer+"\" completed
>> sending:"+number_of_messages+", messages in:"+duration+"ms.");
>>               channel.close();
>>               connection.close();
>>       }
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

On Sat, Apr 16, 2011 at 4:51 PM, Jon Brisbin <jon at jbrisbin.com> wrote:
> Can't your initialization code respond to a message and immediately de-register itself as a consumer?
> Given a routing key binding of "my.work.topic.*" for workers, a single initializer could be bound using "#" and de-register itself as soon as its init code completes:
> synchronized(this) {
>  init();
>  cancel();
> }
> jb
> Thanks!
> Jon Brisbin
> http://jbrisbin.com
> Twitter: @j_brisbin

The initialization code does exist as a centralized module.
This is the situation. I am developing a distributed application which
comprises of several instances of itself at runtime. A single instance
of this application would be mostly single threaded and have no
interprocess communication and can be viewed as a worker. On the fly I
would like to start additional such single instances on a given
PC/server and even start such instances on other physical computers at
will. At the moment I am looking for a way for any such independent
instance to know (learn) if it is indeed the first such instance on
the given PC to reach a given processing step. Then perform the
priming procedures for this step, in the mean time the other instances
should be notified (as a response to a job request) of the ongoing
priming procedure when they reach the start of the given processing
step and wait for the process to complete and only then engage in the
processing of this step.
I am trying to see if I can use AMQP (using RabbitMQ) for end of task
gate keeping in addition to hosting jobs for the distributed worker
application instances.


More information about the rabbitmq-discuss mailing list