[rabbitmq-discuss] Creating a job only once (using Java Client API)
Jon Brisbin
jon at jbrisbin.com
Sat Apr 16 14:51:09 BST 2011
Can't your initialization code respond to a message and immediately de-register itself as a consumer?
Given a routing key binding of "my.work.topic.*" for workers, a single initializer could be bound using "#" and de-register itself as soon as its init code completes:
synchronized(this) {
init();
cancel();
}
jb
On Apr 16, 2011, at 7:03 AM, Allan Kamau wrote:
> I have potentially many clients what may attempt create a given type
> of job at the same time and more than once. And I would like the given
> job (maybe identified by a job_name) for the given queue to be created
> only once. And for the given job to be or have been executed
> successfully only once in the future, present or in the past.
>
> This is the long format of the situation.
> I have several clients for whom a particular initialization step needs
> to happen only once. Each such client represents an instance of an
> application I am executing in parallel, for a particular node I would
> like some data initialization procedure to be run only once triggered
> by a single instance of these processes even though there will be many
> such instances (processes) started at the same time. After the task
> has been created a note that such a task as already been created
> (already run or not) should be evident for all instances of this
> application present and even for the instances to be created in the
> future accessing a given queue_name.
>
> Below is what I have tried without much success.
>
> public void runHelper()throws IOException,java.lang.InterruptedException
> {
> long start_time=System.currentTimeMillis();
> long duration=0;
> ConnectionFactory factory=new ConnectionFactory();
> factory.setHost("localhost");
> Connection connection=factory.newConnection();
> Channel channel=connection.createChannel();
>
> channel.queueDeclare(queue_name,false,false,false,null);
> String message=null;
>
> QueueingConsumer consumer=new QueueingConsumer(channel);
> boolean autoAck=false;
> channel.basicConsume(queue_name,autoAck,consumer);
>
> int messages_cnt=0;
> for(int i=1;i<=number_of_messages;i++)
> {
> QueueingConsumer.Delivery
> delivery=consumer.nextDelivery(1000);//consumer.nextDelivery();
> if(delivery==null)
> {
> //this means no message has been send received for the given
> channel and queue.
> //so let's send one.
> message="dothis, from \""+_id__producer+"\", id:"+i;
> channel.basicPublish("",queue_name,null,message.getBytes());
> number_of_jobs_posted++;
>
> }
> else
> {
> //consume the message
> messages_cnt+=1;
> String message2=new String(delivery.getBody());
> if(message2.startsWith("dothis"))
> {
> System.out.println(new java.util.Date()+",
> _id__producer:\""+_id__producer+"\", got the job to be executed only
> once:\""+message2+"\"");
> message2="donethat";
> channel.basicPublish("",queue_name,null,message2.getBytes());
> channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
> number_of_jobs_received++;
> }
> else
> {
> //do not acknowledge the receiption of the message.
> }
> System.out.println(new java.util.Date()+" -
> _id__producer:\""+_id__producer+"\", messages_cnt:"+messages_cnt+",
> [x]Received:'"+message2+"'. "+new java.util.Date());
> }
> }
> duration=System.currentTimeMillis()-start_time;
> System.out.println("[x] Thread:\""+_id__producer+"\" completed
> sending:"+number_of_messages+", messages in:"+duration+"ms.");
>
> channel.close();
> connection.close();
>
> }
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Thanks!
Jon Brisbin
http://jbrisbin.com
Twitter: @j_brisbin
More information about the rabbitmq-discuss
mailing list