#!/usr/bin/env python -OO # -*- coding: utf-8 -*- """ eventlogging-forwarder ---------------------- Arbitrary input -> ZeroMQ socket forwarding. Reads line-oriented input from an input stream and writes it to a ZeroMQ TCP PUB socket bound to the same port number. usage: eventlogging-forwarder [-h] input output [output ...] positional arguments: input input uri Examples: udp://localhost:8599 kafka://?brokers=broker1:9092,broker2:9092&topic=foo&group_id=bar output URIs of output streams optional arguments: -h, --help show this help message and exit :copyright: (c) 2012 by Ori Livneh :license: GNU General Public Licence 2.0 or later """ from __future__ import unicode_literals import sys reload(sys) sys.setdefaultencoding('utf-8') import argparse import logging from eventlogging import setup_logging, get_reader, get_writer, uri_force_raw ap = argparse.ArgumentParser(description='UDP => Writers Device', fromfile_prefix_chars='@') # Forward raw events. This keeps the reader # attempting to parse the input as json. ap.add_argument('input', help='URI of raw input stream', type=uri_force_raw) ap.add_argument('output', nargs='+', help='URIs if outputs (multicast)', type=uri_force_raw) ap.add_argument('--count', action='store_true', help='Prepend an autoincrementing ID to each message') args = ap.parse_args() setup_logging() input_stream = get_reader(args.input) writers_list = [] for output_uri in args.output: writers_list.append(get_writer(output_uri)) logging.info('Forwarding %s => %s...', args.input, output_uri) if args.count: input_stream = ( '{0}\t{1}'.format(str(id), msg) for id, msg in enumerate(input_stream) ) for line in input_stream: for writer in writers_list: writer.send(line)