[rabbitmq-discuss] Safely replaying archived messages from an external store

Charles Duffy charles at dyfis.net
Tue Oct 16 17:39:04 BST 2012


Howdy --

I have a process where, on startup, I want to ensure that an internal 
state machine (using data fed off an AMQP fanout exchange) is 
synchronized with the state it would be in had it been continuously 
listening to the feed for the last several hours. (State is guaranteed 
to be dependent only on messages received within a defined window).

This means archiving messages in an external store (such as a database) 
in such a way that they can be replayed, in-order, with no losses. As 
the timestamp in the AMQP header is no more than second-resolution, and 
I don't trust client-generated timestamps to be synchronized (yes, we 
use NTP, but this is fed by datacenters that span continents, and I'm 
wary of jitter resulting in race conditions where multiple instances are 
out-of-sync), the algorithm that comes to mind is as follows:

Archiver process:
- Read first message. Note header timestamp. Discard.
- Read and discard further messages until header timestamp changes 
(second boundary crossed).
- Record each message keyed by a (second_timestamp, idx) tuple, 
resetting idx to 0 at each second boundary.

State machine process:
- Read first message. Note header timestamp. Discard.
- Read and discard further messages until header timestamp changes 
(second boundary crossed). First message on a new second boundary will 
henceforth be referred to as "first message".
- Query archive database for messages between beginning of window and 
time of first message, inclusive of the latter; replay into engine.
- Until the first message, or a message newer than same [as measured by 
the (header_time, idx) tuple], has been retrieved from the database:
-- Query archive database for oldest message newer than most recent seen 
in database, LIMIT 1; replay into engine.
- Close database connection; replay all received messages directly into 
engine.

Does this have obvious holes? Is a better approach (again, not trusting 
the clocks of the clients which add content to the exchange to be 
accurate) available?

Thanks!



More information about the rabbitmq-discuss mailing list