#!/usr/bin/python """ disttailf.py - distributed "tail -f" Aggregates "tail -f" output from multiple machines and multiple files into a single RabbitMQ pubsub queue (kind of splunk's log consolidation function) Usage: Producer: disttailf.py [-s broker_host] [-p broker_port] [-x spec_xml] file ... Consumer: disttailf.py [-s broker_host] [-p broker_port] [-x spec_xml] -c """ import sys, os, time import socket import Queue QPID_SVN_PATH = '/usr/local/qpid-svn/qpid' sys.path.append(QPID_SVN_PATH + '/python') import qpid from qpid.client import Client from qpid.content import Content def consumer(client, ch): myqueue = ch.queue_declare() ch.queue_bind(queue=myqueue.queue, exchange='amq.topic', routing_key='disttailf.#') cons = ch.basic_consume(queue=myqueue.queue, no_ack=True) python_queue = client.queue(cons.consumer_tag) while True: try: if client.closed: raise qpid.peer.closed("Rabbitmq broker disconected") msg = python_queue.get(timeout=1) print "== ", msg.content['headers'], \ msg.routing_key.split('.')[-1] print msg.content.body except Queue.Empty: time.sleep(0.5) def producer(client, ch, filenames): rkey = "disttailf." + socket.gethostname().split('.')[0] producer_queue = ch.queue_declare() ch.queue_bind(queue=producer_queue.queue, exchange="amq.topic", routing_key=rkey) for fname,line in tail_f(filenames): c = Content(line) c['headers'] = { 'sent':int(time.time()), 'filename':fname } ch.basic_publish(routing_key=rkey, content=c, exchange='amq.topic') print '%s: %s' % (fname, line) def tail_f(filenames): filedict = dict([ (f,open_or_none(f)) for f in filenames ]) reopen_counter = 0 while True: if reopen_counter > 120: reopen_counter = 0 filedict.update(dict([ (f,open_or_none(f)) for f in filenames if filedict[f] is None ])) for f in filedict.values(): if f: try: assert os.stat(f.name).st_size >= f.tell() except: print >>sys.stderr, f.name, ": removed or truncated" # file was removed or truncated f.close() filedict[f.name] = None continue while True: line = f.readline().strip() if line: yield f.name,line else: break reopen_counter += 1 time.sleep(0.5) def open_or_none(filename): try: return open(filename) except: return None ########## if __name__ == '__main__': import getopt try: o,filenames = getopt.getopt(sys.argv[1:], 's:p:x:c') except: print __doc__ raise # defaults server = '127.0.0.1' port = 5672 specxml = './amqp0-8.xml' acts_as_consumer = False for k,v in o: if k == '-s': server = v elif k == '-p': port = int(v) elif k == '-x': specxml = v elif k == '-c': acts_as_consumer = True # set up connection to rabbitmq broker client = Client(server, port, spec=qpid.spec.load(specxml)) client.start({ 'LOGIN':'guest', 'PASSWORD':'guest' }) ch = client.channel(1) ch.channel_open() if acts_as_consumer: consumer(client, ch) else: if len(filenames) == 0: print __doc__ raise Exception("List of file names is empty - nothing to do") producer(client, ch, filenames) # vim: expandtab