[rabbitmq-discuss] [2.4.0] Durable queue dropped when channel issues basicAck
jadz0r
kippage at gmail.com
Sat Apr 2 04:34:15 BST 2011
Hi All,
I've run across a problem with durable my durable message queue being
dropped when my channel issues a basicAck. Messages are being sent
with delivery_type 2 (persistent).
If I publish 2 messages into the queue, the first message is processed
by my consumer and issues the basicAck method. An error is generated
in the logs and my queue disappears (running sudo rabbitmqctl
list_queues).
Interestingly enough my consumer is able to process the next message
in the queue, but subsequent messages that are newly published are not
picked up by my consumer.
My understanding is that this would be because the consumer isn't
aware that a new queue has been created.
Any help would be greatly appreciated.
Thanks,
Jared
Publisher code:
$ch = $conn->channel();
$ch->access_request($VHOST, false, false, true, true);
$ch->exchange_declare($EXCHANGE."_".$queue, 'direct', false, true,
false);
$ch->queue_declare($queue, false, true, false, false) ;
$ch->queue_bind($queue, $EXCHANGE."_".$queue);
Here is the basic code for consuming a message:
public void start() throws IOException
{
_channel.queueDeclare(getQueueName(), true, false, false, null);
_channel.basicConsume(getQueueName(), false, _consumer);
preConsume(); // class implementing functionality of consumer
initiates internals specific to itselfl
while(true) {
QueueingConsumer.Delivery delivery;
try {
delivery = _consumer.nextDelivery();
String message = new String(delivery.getBody());
consume(message); // message is processed by implemented class
}
catch(InterruptedException exception) {
continue;
}
_channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false); // Queue is dropped here once ack is sent
}
}
Error from sasl log:
=CRASH REPORT==== 2-Apr-2011::07:54:59 ===
crasher:
pid: <0.358.0>
registered_name: []
exception exit: {undef,[{gb_trees,map,
[#Fun<rabbit_amqqueue_process.
12.104672275>,
{0,nil}]},
{rabbit_amqqueue_process,confirm_messages,
2},
{rabbit_amqqueue_process,next_state,1},
{rabbit_amqqueue_process,noreply,1},
{gen_server2,handle_msg,2},
{proc_lib,wake_up,3}]}
in function gen_server2:terminate/3
initial call: gen_server2:wake_hib({gs2_state,<0.178.0>,<0.358.0>,
{q,
{amqqueue,
{resource,<<"/">>,queue,
<<"db_company">>},
true,false,none,
[],<0.358.0>},
none,true,rabbit_variable_queue,
{vqstate,
{[],[]},
{0,{[],[]}},
{delta,undefined,
0,undefined},
{0,{[],[]}},
{[],[]},
2,
{dict,2,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[]},
{{[[0|
{true,
<<121,6,154,117,162,112,237,
254,21,76,48,123,154,135,53,
114>>,
{message_properties,
undefined,false}}]],
[],[],[],[],[],[],[],[],
[],[],
[[1|
{true,
<<175,159,166,165,243,114,240,
245,199,17,72,46,220,53,186,
196>>,
{message_properties,
undefined,false}}]],
[],[],[],[]}}},
undefined,
{0,nil},
{qistate,
"/var/lib/rabbitmq/mnesia/
rabbit at involv-dev/queues/EVF8J2RHRQO46CTTX0DVJKTK9",
{{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[],[]}}},
[{segment,0,
"/var/lib/rabbitmq/
mnesia/rabbit at involv-dev/queues/EVF8J2RHRQO46CTTX0DVJKTK9/0.idx",
{array,16384,0,undefined,
100000},
2}]},
#Ref<0.0.0.721>,0,262144,
#Fun<rabbit_variable_queue.
2.121033067>,
[]},
{{client_msstate,
msg_store_persistent,
<<31,33,216,196,241,242,147,144,
217,18,255,160,157,100,196,140>>,
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[],[]}}},
{state,83,
"/var/lib/rabbitmq/mnesia/
rabbit at involv-dev/msg_store_persistent"},
rabbit_msg_store_ets_index,
"/var/lib/rabbitmq/mnesia/
rabbit at involv-dev/msg_store_persistent",
<0.177.0>,85,82,84,86},
{client_msstate,
msg_store_transient,
<<16,117,138,232,194,41,140,239,62,
188,209,66,185,246,189,69>>,
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[],[]}}},
{state,78,
"/var/lib/rabbitmq/mnesia/
rabbit at involv-dev/msg_store_transient"},
rabbit_msg_store_ets_index,
"/var/lib/rabbitmq/mnesia/
rabbit at involv-dev/msg_store_transient",
<0.169.0>,80,77,79,81}},
{sync,[],[],[],[]},
true,0,
#Fun<rabbit_amqqueue_process.
4.56639861>,
#Fun<rabbit_amqqueue_process.
5.103465571>,
0,2,infinity,0,0,0,0,0,0,
{rates,
{{1301,691228,21015},0},
{{1301,691228,21015},0},
0.1382354671152684,0.0,
{1301,691230,692626}},
{0,nil},
{0,nil},
{0,nil},
{0,nil},
0,0,
{rates,
{{1301,691228,21015},0},
{{1301,691228,21015},0},
0.0,0.1382354671152684,
{1301,691230,692626}}},
{[{<0.383.0>,
{consumer,
<<"amq.ctag-
ve0ALPxaqYEjstvbwd2p1g==">>,
true}}],
[]},
{[],[]},
undefined,undefined,undefined,
undefined,
{state,none,undefined},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],
[],[],
[],[],[],[],[]}}},
undefined,undefined},
rabbit_amqqueue_process,hibernate,
{{1301,691230,692715},
{backoff,2665,1000,10000,
{12376,17423,26819}}},
{queue,[],[]},
[],#Fun<gen_server2.13.80334034>,
#Fun<gen_server2.12.71926735>,
#Fun<gen_server2.12.71926735>})
ancestors: [rabbit_amqqueue_sup,rabbit_sup,<0.108.0>]
messages: [{'$gen_cast',
{run_backing_queue,
#Fun<rabbit_variable_queue.38.117862577>}}]
links: [<0.178.0>]
dictionary: [{fhc_age_tree,{0,nil}},
{guid,{{39,<0.358.0>},1}},
{{ch,<0.383.0>},
{cr,1,<0.383.0>,undefined,#Ref<0.0.0.767>,
{sets,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],
[1],
[],[],[],[]}}},
false,none,0}}]
trap_exit: true
status: running
heap_size: 4181
stack_size: 23
reductions: 10009
neighbours:
=SUPERVISOR REPORT==== 2-Apr-2011::07:54:59 ===
Supervisor: {local,rabbit_amqqueue_sup}
Context: child_terminated
Reason: {undef,[{gb_trees,map,
[#Fun<rabbit_amqqueue_process.
12.104672275>,
{0,nil}]},
{rabbit_amqqueue_process,confirm_messages,2},
{rabbit_amqqueue_process,next_state,1},
{rabbit_amqqueue_process,noreply,1},
{gen_server2,handle_msg,2},
{proc_lib,wake_up,3}]}
Offender: [{pid,<0.358.0>},
{name,rabbit_amqqueue},
{mfa,
{rabbit_amqqueue_process,start_link,
[{amqqueue,
{resource,<<"/">>,queue,<<"db_company">>},
true,false,none,[],none}]}},
{restart_type,temporary},
{shutdown,4294967295},
{child_type,worker}]
Error from log:
=INFO REPORT==== 2-Apr-2011::07:51:40 ===
Limiting to approx 924 file handles (829 sockets)
=INFO REPORT==== 2-Apr-2011::07:51:40 ===
Memory limit set to 806MB.
=INFO REPORT==== 2-Apr-2011::07:51:40 ===
msg_store_transient: using rabbit_msg_store_ets_index to provide index
=INFO REPORT==== 2-Apr-2011::07:51:40 ===
msg_store_persistent: using rabbit_msg_store_ets_index to provide
index
=INFO REPORT==== 2-Apr-2011::07:51:40 ===
started TCP Listener on 0.0.0.0:5672
=ERROR REPORT==== 2-Apr-2011::07:51:40 ===
** gen_event handler rabbit_error_logger crashed.
** Was installed in error_logger
** Last event was: {info_msg,<0.107.0>,
{<0.182.0>,"started ~s on ~s:~p~n",
["TCP Listener","0.0.0.0",5672]}}
** When handler state ==
{resource,<<"/">>,exchange,<<"amq.rabbitmq.log">>}
** Reason == {'function not exported',
[{mnesia,read,
[rabbit_topic_trie_edge,
{trie_edge,
{resource,<<"/">>,exchange,<<"amq.rabbitmq.log">>},
root,"info"}]},
{rabbit_exchange_type_topic,trie_child,3},
{rabbit_exchange_type_topic,trie_match_part,6},
{lists,foldl,3},
{mnesia_tm,non_transaction,5},
{rabbit_exchange_type_topic,'-route/2-lc$^0/1-0-',
2},
{rabbit_exchange_type_topic,route,2},
{rabbit_exchange,route,2}]}
=INFO REPORT==== 2-Apr-2011::07:53:33 ===
accepted TCP connection on 0.0.0.0:5672 from 127.0.0.1:34112
=INFO REPORT==== 2-Apr-2011::07:53:33 ===
starting TCP connection <0.353.0> from 127.0.0.1:34112
=INFO REPORT==== 2-Apr-2011::07:53:33 ===
closing TCP connection <0.353.0> from 127.0.0.1:34112
=INFO REPORT==== 2-Apr-2011::07:53:46 ===
accepted TCP connection on 0.0.0.0:5672 from 192.168.1.17:56019
=INFO REPORT==== 2-Apr-2011::07:53:46 ===
starting TCP connection <0.380.0> from 192.168.1.17:56019
=ERROR REPORT==== 2-Apr-2011::07:54:59 ===
** Generic server <0.358.0> terminating
** Last message in was {'$gen_cast',
{run_backing_queue,
#Fun<rabbit_variable_queue.
38.117862577>}}
** When Server state == {q,
{amqqueue,
{resource,<<"/">>,queue,<<"db_company">>},
true,false,none,[],<0.358.0>},
none,true,rabbit_variable_queue,
{vqstate,
{[],[]},
{0,{[],[]}},
{delta,undefined,0,undefined},
{0,{[],[]}},
{[],[]},
2,
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],
[[1|
{true,
<<175,159,166,165,243,114,240,245,199,17,72,46,
220,53,186,196>>,
{message_properties,undefined,false}}]],
[],[],[],[]}}},
undefined,
{0,nil},
{qistate,
"/var/lib/rabbitmq/mnesia/rabbit at involv-dev/
queues/EVF8J2RHRQO46CTTX0DVJKTK9",
{{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],
[]}}},
[{segment,0,
"/var/lib/rabbitmq/mnesia/rabbit at involv-
dev/queues/EVF8J2RHRQO46CTTX0DVJKTK9/0.idx",
{array,16384,0,undefined,
{{{{{{no_pub,no_del,ack},
undefined,undefined,undefined,undefined,
undefined,undefined,undefined,undefined,
undefined},
10,10,10,10,10,10,10,10,10,10},
100,100,100,100,100,100,100,100,100,100},
1000,1000,1000,1000,1000,1000,1000,1000,1000,
1000},
10000,10000,10000,10000,10000,10000,10000,
10000,10000,10000}},
1}]},
#Ref<0.0.0.721>,1,262144,
#Fun<rabbit_variable_queue.2.121033067>,
[]},
{{client_msstate,msg_store_persistent,
<<31,33,216,196,241,242,147,144,217,18,255,160,157,
100,196,140>>,
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],
[]}}},
{state,83,
"/var/lib/rabbitmq/mnesia/rabbit at involv-
dev/msg_store_persistent"},
rabbit_msg_store_ets_index,
"/var/lib/rabbitmq/mnesia/rabbit at involv-
dev/msg_store_persistent",
<0.177.0>,85,82,84,86},
{client_msstate,msg_store_transient,
<<16,117,138,232,194,41,140,239,62,188,209,66,185,
246,189,69>>,
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],
[]}}},
{state,78,
"/var/lib/rabbitmq/mnesia/rabbit at involv-
dev/msg_store_transient"},
rabbit_msg_store_ets_index,
"/var/lib/rabbitmq/mnesia/rabbit at involv-
dev/msg_store_transient",
<0.169.0>,80,77,79,81}},
{sync,[],[],[],[]},
true,0,#Fun<rabbit_amqqueue_process.
4.56639861>,
#Fun<rabbit_amqqueue_process.5.103465571>,
0,1,
infinity,0,0,0,0,0,0,
{rates,
{{1301,691228,21015},0},
{{1301,691228,21015},0},
0.1382354671152684,0.0,
{1301,691230,692626}},
{0,nil},
{0,nil},
{0,nil},
{0,nil},
1,0,
{rates,
{{1301,691228,21015},0},
{{1301,691228,21015},0},
0.0,0.1382354671152684,
{1301,691230,692626}}},
{[{<0.383.0>,
{consumer,
<<"amq.ctag-
ve0ALPxaqYEjstvbwd2p1g==">>,true}}],
[]},
{[],[]},
undefined,undefined,
{1301691304961094,#Ref<0.0.0.828>},
undefined,
{state,none,undefined},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]}}},
undefined,undefined}
** Reason for termination ==
** {'function not exported',
[{gb_trees,map,
[#Fun<rabbit_amqqueue_process.12.104672275>,{0,nil}]},
{rabbit_amqqueue_process,confirm_messages,2},
{rabbit_amqqueue_process,next_state,1},
{rabbit_amqqueue_process,noreply,1},
{gen_server2,handle_msg,2},
{proc_lib,wake_up,3}]}
=WARNING REPORT==== 2-Apr-2011::07:57:22 ===
exception on TCP connection <0.380.0> from 192.168.1.17:56019
connection_closed_abruptly
=INFO REPORT==== 2-Apr-2011::07:57:22 ===
closing TCP connection <0.380.0> from 192.168.1.17:56019
=INFO REPORT==== 2-Apr-2011::07:57:44 ===
accepted TCP connection on 0.0.0.0:5672 from 192.168.1.17:56066
=INFO REPORT==== 2-Apr-2011::07:57:44 ===
starting TCP connection <0.738.0> from 192.168.1.17:56066
=INFO REPORT==== 2-Apr-2011::07:57:47 ===
accepted TCP connection on 0.0.0.0:5672 from 127.0.0.1:49494
=INFO REPORT==== 2-Apr-2011::07:57:47 ===
starting TCP connection <0.750.0> from 127.0.0.1:49494
=INFO REPORT==== 2-Apr-2011::07:57:47 ===
closing TCP connection <0.750.0> from 127.0.0.1:49494
More information about the rabbitmq-discuss
mailing list