#!/usr/bin/env python -OO # -*- coding: utf-8 -*- """ eventlogging-reporter --------------------- This script emits StatsD metric stats for raw and processed EventLogging streams configured on this server. The script discovers stream URIs by walking the /etc/eventlogging.d/processors directory. :copyright: (c) 2014 by Ori Livneh :license: GNU General Public Licence 2.0 or later """ import sys reload(sys) sys.setdefaultencoding('utf-8') import argparse import collections import ctypes import ctypes.util import errno import inspect import logging import math import os import re import socket import zmq from eventlogging import setup_logging setup_logging() ap = argparse.ArgumentParser(description='eventlogging-reporter', fromfile_prefix_chars='@') ap.add_argument('--host', default='localhost', type=socket.gethostbyname) ap.add_argument('--port', default=8125, type=int) ap.add_argument('--prefix', default='eventlogging') args = ap.parse_args() addr = (args.host, args.port) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) libc_so = ctypes.util.find_library('c') libc = ctypes.CDLL(libc_so, use_errno=True) class timespec(ctypes.Structure): _fields_ = (('tv_sec', libc.time.restype), ('tv_nsec', ctypes.c_long)) def set(self, time): fractpart, intpart = math.modf(time) self.tv_nsec = int(fractpart * 10e9) self.tv_sec = int(intpart) class itimerspec(ctypes.Structure): _fields_ = (('it_interval', timespec), ('it_value', timespec)) def __init__(self, interval, value=None): super(itimerspec, self).__init__() value = interval if value is None else value self.it_interval.set(interval) self.it_value.set(interval) def timerfd(interval, value=None): fd = libc.timerfd_create(1, os.O_NONBLOCK) spec = itimerspec(interval, value) res = libc.timerfd_settime(fd, 0, ctypes.pointer(spec)) if res < 0: errno = ctypes.get_errno() raise OSError(errno, os.strerror(errno)) return fd def iter_files(path): """Recursively walk a file hierarchy.""" entries = os.walk(path) return (os.path.join(dir, f) for dir, _, fs in entries for f in fs) def iter_pubs(config_dir): """Discover local EventLogging publishers. Assumes really a tcp stream """ # TODO Kafka client side stream not reported publishers = {} logger = logging.getLogger('Log') for filename in iter_files(config_dir): name = re.sub(r'[^A-Za-z0-9]+', '_', os.path.basename(filename)) with open(filename) as f: matches = re.findall(r'tcp://[^:]+:(\d+)', f.read()) if len(matches) > 1: logger.info("Reporting metrics for %s", filename) raw = matches[0] valid = matches[1] publishers[('%s.raw' % name, 'overall.raw')] = raw publishers[('%s.valid' % name, 'overall.valid')] = valid return publishers def monitor_pubs(endpoints): """ Count events streaming on a set of EventLogging publishers. *endpoints* is a dict that maps human-readable endpoint names to endpoint URIs. The names are used as metric names in Ganglia and as the ZMQ_IDENTITY of the underlying socket. """ ctx = zmq.Context.instance() poller = zmq.Poller() counts = collections.defaultdict(int) sockets = {} for names, port in endpoints.iteritems(): logging.info('Registering %s (%s).', names, port) sub_socket = ctx.socket(zmq.SUB) sub_socket.hwm = 1000 sub_socket.linger = 1000 sub_socket.setsockopt(zmq.RCVBUF, 65536) sub_socket.connect('tcp://%s:%s' % ( socket.gethostbyname(socket.gethostname()), port )) sub_socket.setsockopt(zmq.SUBSCRIBE, '') poller.register(sub_socket, zmq.POLLIN) sockets[sub_socket] = names timer = timerfd(5) poller.register(timer, zmq.POLLIN) while 1: try: for fd, _ in poller.poll(): names = sockets.get(fd) if names: fd.recv(zmq.NOBLOCK) for name in names: counts[name] += 1 else: os.read(fd, 8) for name, value in counts.iteritems(): if value: stat = '%s.%s:%s|c' % (args.prefix, name, value) sock.sendto(stat.encode('utf-8'), addr) counts.clear() except KeyboardInterrupt: # PyZMQ 13.0.x raises EINTR as KeyboardInterrupt. # Fixed in . if any(f for f in inspect.trace() if 'check_rc' in f[3]): continue raise except zmq.ZMQError as e: # Calls interrupted by EINTR should be re-tried. if e.errno == errno.EINTR: continue raise if __name__ == '__main__': print monitor_pubs(iter_pubs('/etc/eventlogging.d/processors'))