[rabbitmq-discuss] Creating a job only once (using Java Client API)

Jon Brisbin jon at jbrisbin.com
Sat Apr 16 14:51:09 BST 2011


Can't your initialization code respond to a message and immediately de-register itself as a consumer?

Given a routing key binding of "my.work.topic.*" for workers, a single initializer could be bound using "#" and de-register itself as soon as its init code completes:

synchronized(this) {
  init();
  cancel();
}

jb


On Apr 16, 2011, at 7:03 AM, Allan Kamau wrote:

> I have potentially many clients what may attempt create a given type
> of job at the same time and more than once. And I would like the given
> job (maybe identified by a job_name) for the given queue to be created
> only once. And for the given job to be or have been executed
> successfully only once in the future, present or in the past.
> 
> This is the long format of the situation.
> I have several clients for whom a particular initialization step needs
> to happen only once. Each such client represents an instance of an
> application I am executing in parallel, for a particular node I would
> like some data initialization procedure to be run only once triggered
> by a single instance of these processes even though there will be many
> such instances (processes) started at the same time. After the task
> has been created a note that such a task as already been created
> (already run or not) should be evident for all instances of this
> application present and even for the instances to be created in the
> future accessing a given queue_name.
> 
> Below is what I have tried without much success.
> 
> 	public void runHelper()throws IOException,java.lang.InterruptedException
> 	{
> 		long start_time=System.currentTimeMillis();
> 		long duration=0;
> 		ConnectionFactory factory=new ConnectionFactory();
> 		factory.setHost("localhost");
> 		Connection connection=factory.newConnection();
> 		Channel channel=connection.createChannel();
> 		
> 		channel.queueDeclare(queue_name,false,false,false,null);
> 		String message=null;
> 
> 		QueueingConsumer consumer=new QueueingConsumer(channel);
> 		boolean autoAck=false;
> 		channel.basicConsume(queue_name,autoAck,consumer);
> 		
> 		int messages_cnt=0;
> 		for(int i=1;i<=number_of_messages;i++)
> 		{
> 			QueueingConsumer.Delivery
> delivery=consumer.nextDelivery(1000);//consumer.nextDelivery();
> 			if(delivery==null)
> 			{
> 				//this means no message has been send received for the given
> channel and queue.
> 				//so let's send one.
> 				message="dothis, from \""+_id__producer+"\", id:"+i;
> 				channel.basicPublish("",queue_name,null,message.getBytes());
> 				number_of_jobs_posted++;
> 				
> 			}
> 			else
> 			{
> 				//consume the message
> 				messages_cnt+=1;
> 				String message2=new String(delivery.getBody());
> 				if(message2.startsWith("dothis"))
> 				{
> 					System.out.println(new java.util.Date()+",
> _id__producer:\""+_id__producer+"\", got the job to be executed only
> once:\""+message2+"\"");
> 					message2="donethat";
> 					channel.basicPublish("",queue_name,null,message2.getBytes());
> 					channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
> 					number_of_jobs_received++;
> 				}
> 				else
> 				{
> 					//do not acknowledge the receiption of the message.
> 				}
> 				System.out.println(new java.util.Date()+" -
> _id__producer:\""+_id__producer+"\", messages_cnt:"+messages_cnt+",
> [x]Received:'"+message2+"'. "+new java.util.Date());
> 			}
> 		}
> 		duration=System.currentTimeMillis()-start_time;
> 		System.out.println("[x] Thread:\""+_id__producer+"\" completed
> sending:"+number_of_messages+", messages in:"+duration+"ms.");
> 		
> 		channel.close();
> 		connection.close();
> 		
> 	}
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


Thanks!

Jon Brisbin

http://jbrisbin.com
Twitter: @j_brisbin




More information about the rabbitmq-discuss mailing list