[rabbitmq-discuss] librabbitmq faults

Matti Linnanvuori matti.linnanvuori at portalify.com
Wed Apr 4 13:16:45 BST 2012


Hi!

I investigated the problem with the RabbitMQ client being stuck in recv system call.

I saw that netstat -ap shows that TCP connection to RabbitMQ server is established.
tcp        0      0 localhost:37915         localhost:amqp          ESTABLISHED 5841/perl           

However, messages keep piling in the queue that the Perl process is supposed to receive from:

rabbitmqctl list_queues
Listing queues ...
...
queue.pmc.router-in	71

The problem occurs only some time after RabbitMQ application is stopped, reset and started. 

This seems to me to be an error in RabbitMQ server. I am using RabbitMQ server version 2.7.1. Version 2.8.1 behaves the same.

I copy below the script that produces the error.

#!/usr/bin/perl -w

=pod

=head1 sds2sds_sf_fail.t

 Test 1: restart Routing and stop application, reset and start application in RabbitMQ server.
 Publish a Dispatch JSON message with ack_level and valid_until from SDS to SDS
 in exchange.pmc.router-in.
 Try to receive a message from exchange.pmc.router-in in 8 seconds.
 Test 1 expected result: a message is received from exchange.pmc.router-in in 8 seconds.
 Test 2: decode the message as JSON.
 Test 2 expected result: the message is decoded as JSON.
 Test 3: check that the sender field is the same as in the sent message.
 Test 3 expected result: the sender field is the same as in the sent message.
 Test 4: check the body of the received message and the sent one.
 Test 4 expected result: the body fields are the same.
 Test 5: check the timestamp field.
 Test 5 expected result: the timestamp field is the same as in the published message.
 Test 6: try to receive a message from exchange.pmc.sf-in in 8 seconds.
 Test 6 expected result: a message is received from exchange.pmc.sf-in in 8 seconds.
 Test 7: decode the message as JSON.
 Test 7 expected result: the message is decoded as JSON.
 Test 8: check that the sender field is "exchange.pmc.router-in".
 Test 8 expected result: the sender field is "exchange.pmc.router-in".
 Test 9: check the body of the received message and the sent one.
 Test 9 expected result: the body fields are the same.
 Test 10: check that the timestamp in an integer.
 Test 10 expected result: the timestamp in an integer.
 Test 11: check that the timestamp is greater than or equal to that of before publishing.
 Test 11 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 12: check that the timestamp is less than or equal to that of now.
 Test 12 expected result: the timestamp is less than or equal to that of now.
 Test 13: try to receive a message from exchange.pmc.router-in in 8 seconds.
 Test 13 expected result: a message is received from exchange.pmc.router-in in 8 seconds.
 Test 14: decode the message as JSON.
 Test 14 expected result: the message is decoded as JSON.
 Test 15: check that the sender field is "exchange.pmc.sf-in".
 Test 15 expected result: the sender field is "exchange.pmc.sf-in".
 Test 16: check the body of the received message and the sent one.
 Test 16 expected result: the body fields are the same.
 Test 17: check that the timestamp in an integer.
 Test 17 expected result: the timestamp in an integer.
 Test 18: check that the timestamp is greater than or equal to that of before publishing.
 Test 18 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 19: check that the timestamp is less than or equal to that of now.
 Test 19 expected result: the timestamp is less than or equal to that of now.
 Test 20: try to receive a message from exchange.pmc.router-in in 8 seconds.
 Test 20 expected result: a message is received from exchange.pmc.router-in in 8 seconds.
 Test 21: decode the message as JSON.
 Test 21 expected result: the message is decoded as JSON.
 Test 22: check that the sender field is "exchange.pmc.sf-in".
 Test 22 expected result: the sender field is "exchange.pmc.sf-in".
 Test 23: check that the timestamp in an integer.
 Test 23 expected result: the timestamp in an integer.
 Test 24: check that the timestamp is greater than or equal to that of before publishing.
 Test 24 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 25: check that the timestamp is less than or equal to that of now.
 Test 25 expected result: the timestamp is less than or equal to that of now.
 Test 26: check that the id fields of the received and the published messages are the same.
 Test 26 expected result: the id fields of the received and the published messages are the same.
 Test 27: check that the ack_type field has value "processed".
 Test 27 expected result: the ack_type field has value "processed".
 Test 28: check that the from field of the received message is the same as
 the to field of the published message.
 Test 28 expected result: the from field of the received message is the same as
 the to field of the published message.
 Test 29: check that the type field has value "ack".
 Test 29 expected result: the type field has value "ack".
 Test 30: check that the version field has value 1.
 Test 30 expected result: the version field has value 1.
 Test 31: try to receive a message from exchange.pmc.cassidian-in in 8 seconds.
 Test 31 expected result: a message is received from exchange.pmc.cassidian-in in 8 seconds.
 Test 32: decode the message as JSON.
 Test 32 expected result: the message is decoded as JSON.
 Test 33: check that the sender field is "exchange.pmc.router-in".
 Test 33 expected result: the sender field is "exchange.pmc.router-in".
 Test 34: check the body of the received message and the sent one.
 Test 34 expected result: the body fields are the same.
 Test 35: check that the timestamp in an integer.
 Test 35 expected result: the timestamp in an integer.
 Test 36: check that the timestamp is greater than or equal to that of before publishing.
 Test 36 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 37: check that the timestamp is less than or equal to that of now.
 Test 37 expected result: the timestamp is less than or equal to that of now.
 Test 38: try to receive a message from exchange.pmc.cassidian-in in 8 seconds.
 Test 38 expected result: a message is received from exchange.pmc.cassidian-in in 8 seconds.
 Test 39: decode the message as JSON.
 Test 39 expected result: the message is decoded as JSON.
 Test 40: check that the sender field is "exchange.pmc.router-in".
 Test 40 expected result: the sender field is "exchange.pmc.router-in".
 Test 41: check that the timestamp in an integer.
 Test 41 expected result: the timestamp in an integer.
 Test 42: check that the timestamp is greater than or equal to that of before publishing.
 Test 42 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 43: check that the timestamp is less than or equal to that of now.
 Test 43 expected result: the timestamp is less than or equal to that of now.
 Test 44: check that the id fields of the received and the published messages are the same.
 Test 44 expected result: the id fields of the received and the published messages are the same.
 Test 45: check that the ack_type field has value "processed".
 Test 45 expected result: the ack_type field has value "processed".
 Test 46: check that the from field of the received message is the same as
 the to field of the published message.
 Test 46 expected result: the from field of the received message is the same as
 the to field of the published message.
 Test 47: check that the type field has value "ack".
 Test 47 expected result: the type field has value "ack".
 Test 48: check that the version field has value 1.
 Test 48 expected result: the version field has value 1.
 Test 49: publish an Ack sent message from the SSI in exchange.pmc.router-in.
 try to receive a message from exchange.pmc.router-in in 8 seconds.
 Test 49 expected result: a message is received from exchange.pmc.router-in in 8 seconds.
 Test 50: decode the message as JSON.
 Test 50 expected result: the message is decoded as JSON.
 Test 51: check that the timestamp of the received message is the same as that of published one.
 Test 51 expected result:
 the timestamp of the received message is the same as that of published one.
 Test 52: check the sender field of the received message.
 Test 52 expected result:
 the sender field of the received message has value "exchange.pmc.cassidian-in".
 Test 53: check the body fields of the received and the published messages.
 Test 53 expected result:
 the body fields of the received and the published messages are the same.
 Test 54: try to receive a message from exchange.pmc.sf-in in 8 seconds.
 Test 54 expected result: a message is received from exchange.pmc.sf-in in 8 seconds.
 Test 55: decode the message as JSON.
 Test 55 expected result: the message is decoded as JSON.
 Test 56: check the sender field of the received message.
 Test 56 expected result:
 the sender field of the received message has value "exchange.pmc.router-in".
 Test 57: check the body fields of the received and the published messages.
 Test 57 expected result:
 the body fields of the received and the published messages are the same.
 Test 58: check that the timestamp in an integer.
 Test 58 expected result: the timestamp in an integer.
 Test 59: check that the timestamp is greater than or equal to that of before publishing.
 Test 59 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 60: check that the timestamp is less than or equal to that of now.
 Test 60 expected result: the timestamp is less than or equal to that of now.
 Test 61: try to receive a message from exchange.pmc.router-in in 8 seconds.
 Test 61 expected result: a message is received from exchange.pmc.router-in in 8 seconds.
 Test 62: decode the message as JSON.
 Test 62 expected result: the message is decoded as JSON.
 Test 63: check the sender field of the received message.
 Test 63 expected result:
 the sender field of the received message has value "exchange.pmc.sf-in".
 Test 64: check the body fields of the received and the published messages.
 Test 64 expected result:
 the body fields of the received and the published messages are the same.
 Test 65: check that the timestamp in an integer.
 Test 65 expected result: the timestamp in an integer.
 Test 66: check that the timestamp is greater than or equal to that of before publishing.
 Test 66 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 67: check that the timestamp is less than or equal to that of now.
 Test 67 expected result: the timestamp is less than or equal to that of now.
 Test 68: try to receive a message from exchange.pmc.cassidian-in in 8 seconds.
 Test 68 expected result: a message is received from exchange.pmc.cassidian-in in 8 seconds.
 Test 69: decode the message as JSON.
 Test 69 expected result: the message is decoded as JSON.
 Test 70: check the sender field of the received message.
 Test 70 expected result:
 the sender field of the received message has value "exchange.pmc.router-in".
 Test 71: check the body fields of the received and the published messages.
 Test 71 expected result:
 the body fields of the received and the published messages are the same.
 Test 72: check that the timestamp in an integer.
 Test 72 expected result: the timestamp in an integer.
 Test 73: check that the timestamp is greater than or equal to that of before publishing.
 Test 73 expected result: the timestamp is greater than or equal to that of before publishing.
 Test 74: check that the timestamp is less than or equal to that of now.
 Test 74 expected result: the timestamp is less than or equal to that of now.

=cut

use strict;

use Net::RabbitMQ;
use Test::More tests => 74;
use JSON;
use Time::HiRes qw(gettimeofday);

use constant ROUTER_EXCHANGE    => 'exchange.pmc.router-in';
use constant ROUTER_CHANNEL     => 1;
use constant ROUTER_QUEUE       => 'queue.test.router-in';
use constant SF_EXCHANGE        => 'exchange.pmc.sf-in';
use constant SF_CHANNEL         => 2;
use constant SF_QUEUE           => 'queue.test.sf-in';
use constant CASSIDIAN_EXCHANGE => 'exchange.pmc.cassidian-in';
use constant CASSIDIAN_CHANNEL  => 3;
use constant CASSIDIAN_QUEUE    => 'queue.test.cassidian-in';

sub timestamp {
    return int( gettimeofday() * 1000 );
}

system(
'service pmc-smpp stop >&2; service pmc-cassidian stop >&2; service pmc-email stop >&2; service pmc-sf stop >&2; service pmc-routing stop >&2'
);

sleep 1;    # Wait for the message-sending to stop.

system(
    'rabbitmqctl stop_app; rabbitmqctl reset; rabbitmqctl start_app; mongo <<END
use database
db.dropDatabase();
END'
);

system('service pmc-sf start >&2; service pmc-routing start >&2');

sleep 8;    # Wait for Store & Forward and Routing to start.

my $json_text = '{
"timestamp" : 1330327514213,
"sender" : "' . CASSIDIAN_EXCHANGE . '",
"body" : {
"type" : "dispatch",
"version" : 1,
"id" : "8f970b1b-f8a3-4c78-aa24-3b8e2205648788",
"from" : {
"type" : "ssi",
"address" : "16777215"
},
"to" : {
"type" : "ssi",
"address" : "0"
},
"body" : "This is a test message.",
"ack_level" : "sent",
"valid_until" : 9223372036854775807,
"auxiliary" : {
"timestamp" : 1325376000000,
"sdstl" : {
"protocol_id" : 130,
"message_reference" : 42,
"short_report_allowed" : true,
"protocol_data" : {
"encoding" : 0,
"timestamp" : {
"month" : 1,
"day" : 1,
"hour" : 0,
"minute" : 0
}
}
}
}
}
}';

my $mq_router = Net::RabbitMQ->new();
$mq_router->connect( "localhost", { user => "guest", password => "guest" } );
$mq_router->channel_open(ROUTER_CHANNEL);
$mq_router->queue_declare(
    ROUTER_CHANNEL,
    ROUTER_QUEUE,
    {
        passive     => 0,
        durable     => 1,
        exclusive   => 0,
        auto_delete => 0
    }
);
$mq_router->queue_bind( ROUTER_CHANNEL, ROUTER_QUEUE, ROUTER_EXCHANGE, '' );
$mq_router->purge( ROUTER_CHANNEL, ROUTER_QUEUE );
$mq_router->consume( ROUTER_CHANNEL, ROUTER_QUEUE );

my $mq_sf = Net::RabbitMQ->new();
$mq_sf->connect( "localhost", { user => "guest", password => "guest" } );
$mq_sf->channel_open(SF_CHANNEL);
$mq_sf->exchange_declare(
    SF_CHANNEL,
    SF_EXCHANGE,
    {
        exchange_type => 'direct',
        passive       => 0,
        durable       => 1,
        auto_delete   => 0
    }
);
$mq_sf->queue_declare(
    SF_CHANNEL,
    SF_QUEUE,
    {
        passive     => 0,
        durable     => 1,
        exclusive   => 0,
        auto_delete => 0
    }
);
$mq_sf->queue_bind( SF_CHANNEL, SF_QUEUE, SF_EXCHANGE, '' );
$mq_sf->purge( SF_CHANNEL, SF_QUEUE );
$mq_sf->consume( SF_CHANNEL, SF_QUEUE );

my $mq_cassidian = Net::RabbitMQ->new();
$mq_cassidian->connect( "localhost", { user => "guest", password => "guest" } );
$mq_cassidian->channel_open(CASSIDIAN_CHANNEL);
$mq_cassidian->exchange_declare(
    CASSIDIAN_CHANNEL,
    CASSIDIAN_EXCHANGE,
    {
        exchange_type => 'direct',
        passive       => 0,
        durable       => 1,
        auto_delete   => 0
    }
);
$mq_cassidian->queue_declare(
    CASSIDIAN_CHANNEL,
    CASSIDIAN_QUEUE,
    {
        passive     => 0,
        durable     => 1,
        exclusive   => 0,
        auto_delete => 0
    }
);
$mq_cassidian->queue_bind( CASSIDIAN_CHANNEL, CASSIDIAN_QUEUE,
    CASSIDIAN_EXCHANGE, '' );
$mq_cassidian->purge( CASSIDIAN_CHANNEL, CASSIDIAN_QUEUE );
$mq_cassidian->consume( CASSIDIAN_CHANNEL, CASSIDIAN_QUEUE );

my $before = timestamp();

$mq_router->publish(
    ROUTER_CHANNEL,
    '',
    $json_text,
    {

        # Options
        exchange => ROUTER_EXCHANGE
    },
    {

        # Props
        content_encoding => "UTF-8",
        content_type     => "application/json"
    }
);

my $sent = decode_json($json_text);
my $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_router->recv();
    alarm 0;
};
is( $@, '', 'A message was received from queue' );
my $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@, '', 'A JSON message received' );
is( $json->{'sender'}, $sent->{'sender'}, 'sender ' . $sent->{'sender'} );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' );
is( $json->{'timestamp'}, $sent->{'timestamp'},
    "timestamp " . $sent->{'timestamp'} );

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_sf->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@, '', 'A JSON message received' );
is( $json->{'sender'}, ROUTER_EXCHANGE, 'sender ' . ROUTER_EXCHANGE );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
my $now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_router->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@,                '',          'A JSON message received' );
is( $json->{'sender'}, SF_EXCHANGE, 'sender ' . SF_EXCHANGE );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
$now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_router->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@,                '',          'A JSON message received' );
is( $json->{'sender'}, SF_EXCHANGE, 'sender ' . SF_EXCHANGE );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
$now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");
is( $json->{'body'}->{'id'}, $sent->{'body'}->{'id'}, 'id the same' );
is( $json->{'body'}->{'ack_type'}, 'processed', 'ack_type processed' );
is_deeply(
    $json->{'body'}->{'from'},
    $sent->{'body'}->{'to'},
    'from is the same as to of sent'
);
is( $json->{'body'}->{'type'},    'ack', 'type is ack' );
is( $json->{'body'}->{'version'}, 1,     'version is 1' );

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_cassidian->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@, '', 'A JSON message received' );
is( $json->{'sender'}, ROUTER_EXCHANGE, 'sender ' . ROUTER_EXCHANGE );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
$now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");

note("Trying to receive an Ack from Cassidian exchange");
undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_cassidian->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@, '', 'A JSON message received' );
is( $json->{'sender'}, ROUTER_EXCHANGE, 'sender ' . ROUTER_EXCHANGE );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
$now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");
is( $json->{'body'}->{'id'}, $sent->{'body'}->{'id'}, 'id the same' );
is( $json->{'body'}->{'ack_type'}, 'processed', 'ack_type processed' );
is_deeply(
    $json->{'body'}->{'from'},
    $sent->{'body'}->{'to'},
    'from is the same as to of sent'
);
is( $json->{'body'}->{'type'},    'ack', 'type is ack' );
is( $json->{'body'}->{'version'}, 1,     'version is 1' );

my $ack = '{
"timestamp" : 1328020064358,
"sender" : "' . CASSIDIAN_EXCHANGE . '",
"body" : {
"type" : "ack",
"version" : 1,
"id" : "8f970b1b-f8a3-4c78-aa24-3b8e2205648788",
"from" : {
"type" : "ssi",
"address" : "0"
},
"ack_type" : "sent",
"auxiliary" : {
"timestamp" : 1325376000000,
"original_message" : ' . encode_json( $sent->{'body'} ) . ',
"sdstl" : {
"protocol_id" : 130,
"message_reference" : 42,
"short_report_allowed" : true,
"protocol_data" : {
"encoding" : 0,
"timestamp" : {
"month" : 1,
"day" : 1,
"hour" : 0,
"minute" : 0
}
}
}
}
}
}';
$sent   = decode_json($ack);
$before = timestamp();

$mq_router->publish(
    ROUTER_CHANNEL,
    '', $ack,
    {

        # Options
        exchange => ROUTER_EXCHANGE
    },
    {

        # Props
        content_encoding => "UTF-8",
        content_type     => "application/json"
    }
);

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_router->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@, '', 'A JSON message received' );
is( $json->{'timestamp'}, $sent->{'timestamp'},
    'timestamp ' . $sent->{'timestamp'} )
  or diag( $message->{'body'} );
is( $json->{'sender'}, CASSIDIAN_EXCHANGE, 'sender ' . CASSIDIAN_EXCHANGE )
  or diag( $message->{'body'} );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' )
  or diag( $message->{'body'} );

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_sf->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@, '', 'A JSON message received' );
is( $json->{'sender'}, ROUTER_EXCHANGE, 'sender ' . ROUTER_EXCHANGE );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
$now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_router->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@,                '',          'A JSON message received' );
is( $json->{'sender'}, SF_EXCHANGE, 'sender ' . SF_EXCHANGE );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
$now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");

undef $message;
eval {
    local $SIG{ALRM} = sub { die "Timeout\n" };
    alarm 8;
    $message = $mq_cassidian->recv();
    alarm 0;
};
is( $@, '', 'A message was received' );
undef $json;
eval { $json = decode_json( $message->{'body'} ); };
is( $@, '', 'A JSON message received' );
is( $json->{'sender'}, ROUTER_EXCHANGE, 'sender ' . ROUTER_EXCHANGE );
is_deeply( $json->{'body'}, $sent->{'body'}, 'body the same' );
like( $json->{'timestamp'}, qr/^\d+$/, "timestamp an integer" );
ok( $json->{'timestamp'} >= $before,
    'timestamp greater than or equal to that of before publishing' )
  or diag("$json->{'timestamp'} < $before");
$now = timestamp();
ok( $json->{'timestamp'} <= $now,
    'timestamp less than or equal to that of now' )
  or diag("$json->{'timestamp'} > $now");

END {
    $mq_sf->queue_unbind( SF_CHANNEL, SF_QUEUE, SF_EXCHANGE, '' );
    $mq_router->queue_unbind( ROUTER_CHANNEL, ROUTER_QUEUE, ROUTER_EXCHANGE,
        '' );
    $mq_cassidian->queue_unbind( CASSIDIAN_CHANNEL, CASSIDIAN_QUEUE,
        CASSIDIAN_EXCHANGE, '' );
    system(
'service pmc-smpp start >&2; service pmc-cassidian start >&2; service pmc-email start >&2'
    );
}

regards, Matti Linnanvuori



More information about the rabbitmq-discuss mailing list