#!/usr/bin/python # -*- coding: utf8 -*- '''notifyd für cyrus imapd. To use this daemon with cyrus imapd as a notifyd replacement, you have to * start the daemon before starting cyrus * install the daemon on the backends, if you are running it in a murder setup * set the values 'notifysocket' and 'mailnotifier' in /etc/imapd.conf e.g. notifysocket: /var/lib/imap/socket/notify mailnotifier: MAIL * start the daemon with the same user as imapd ''' import logging import Queue import signal import stomp import socket from threading import Thread import time from xml.sax.saxutils import escape as xml_escape class QueueSender(Thread): """Sends messages from a python Queue to a stomp queue >>> sender = QueueSender([('127.0.0.2', 61613), ('localhost', 61613)], '/topic/test') >>> sender.start() >>> for i in range(1, 400): ... if not sender.queue_message('this is a test %d' % i): ... print "could not queue message %d" % i >>> sender.force_shutdown() >>> sender.join() """ def __init__(self, config, destination, user=None, password=None): """Construktor for a QueueSender \param config configuration for the stomp-server [('localhost', 61613), ('other.server.de', 61613)] \param destination name of the queue or topic '/topic/test' """ Thread.__init__(self) self.conn = None self.config = config self.user = user self.password = password self.destination = destination self.queue = Queue.Queue(maxsize=300) def __get_connection(self): """tries to get a connection to the configured stomp server. In case of exceptions, it will return None returns: configured connection, or None if an exception has occured """ if self.conn: return self.conn self.conn = stomp.Connection(self.config, user=self.user, passcode=self.password) try: self.conn.start() except stomp.exception.ReconnectFailedException: logging.warn( "Probleme beim Verbinden mit dem Stomp Server. Probieren es später noch einmal...") self.conn = None return self.conn.connect(wait=True) return self.conn def __invalidate_connection(self): self.conn = None def queue_message(self, message, headers={} ,timeout=3): """queues a message to the internal queue and will send it to the stomp server as soon as it can. \param message to be queued \param headers for the message, default empty dict \param timeout for the internal queue, default 3 seconds returns: True if message could be queued internally, False else """ try: self.queue.put(MessageElement(message, headers), True, timeout) except Queue.Full: return False return True def force_shutdown(self): """forces a shutdown of the sender""" self.queue.put(KillElement()) def __send(self, message): """tries to send a message to the configured stomp server \param message to be send by stomp queue returns: True if message could be sent, False else """ conn = self.__get_connection() if not conn: return False try: conn.send(message.message, message.headers, destination=self.destination) except stomp.exception.NotConnectedException: logging.warn("Fehler beim Senden der Nachricht: %s" % message) self.__invalidate_connection() return False return True def run(self): while (True): try: message = self.queue.get(self, True) except Queue.Empty: logging.debug("Queue Empty. Sleep 1 second") time.sleep(1) continue if isinstance(message, MessageElement): if self.__send(message): logging.debug("message sent: %s" % (message)) else: self.info("message NOT sent (retry?): %s" % (message)) time.sleep(10) if isinstance(message, KillElement): logging.info("got KillMessage...") if self.conn: self.conn.disconnect() return class QueueElement(object): """Base class to represent a message used for internal python queue""" pass class KillElement(QueueElement): """Special Message to end the clients listening to queue""" pass class MessageElement(QueueElement): """Message element, which contains a message""" def __init__(self, message, headers={}): self.message = message self.headers = headers class Notifyd(object): '''Daemon to listen on a unix file socket for messages from cyrus-imapd. It will reemit those messages via stomp. >>> notifyd = Notifyd(notifyd_socket, queue_sender, ('always_bcc', )) >>> notifyd.run_loop() ''' def __init__(self, socket_file_name, destination, ignore_users=[]): '''Constructor for notifyd \param socket_file_name Name of the unix file socket \param destination QueueSender, which reemits the messages \param ignore_users List of users, for which no events should be reemitted ''' self.socket_file_name = socket_file_name self.destination = destination self.ignore_users = set(ignore_users) def run_loop(self): '''main loop in which we listen for new messages on the socket and route them to the destinatione. \param socket_file_name Name of the socket from which messages will be read \param destination Instance of a QueueSender's ''' notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) notify_socket.bind(self.socket_file_name) while True: data = notify_socket.recv(8192).split('\0') data_dict = {} key_nr=0 for key in ('method', 'message_class', 'priority', 'user', 'mailbox', 'nopt', 'message', 'opts'): data_dict[key] = xml_escape(data[key_nr]) key_nr+=1 if data_dict['user'] in self.ignore_users: logging.debug("Ignorier message: %s an %s" % (data_dict, data_dict['user'])) else: logging.debug("Send message: %s an %s" % (data_dict, data_dict['user'])) self.destination.queue_message(data_dict, headers={ 'user': data_dict['user'], 'mailbox': data_dict['mailbox'], }) class SignalHandler(object): '''May be used as a signal handler, to kill the queue ''' def __init__(self, queue): '''Constructor for SignalHandler \param queue that should be killed ''' self.queue = queue def handle_signal(self, signo, frame): if signo in (signal.SIGTERM, signal.SIGKILL): self.queue.force_shutdown() if __name__ == '__main__': import ConfigParser import os import os.path import logging.config import logging.handlers config=ConfigParser.ConfigParser() config.read('/etc/notifyd.ini') try: logging.config.fileConfig(config.get('logging', 'file')) logger = logging.getLogger(config.get('logging', 'handler')) except Exception, e: print "Using default config for logging. Reason: %s: %s" % (Exception, e) logging.basicConfig() logger = logging.getLogger() logger.setLevel(logging.INFO) hdlr = logging.handlers.SysLogHandler(facility=logging.handlers.SysLogHandler.LOG_DAEMON) formatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s') hdlr.setFormatter(formatter) logger.addHandler(hdlr) stomp_host = config.get('stomp', 'host') stomp_port = config.getint('stomp', 'port') stomp_user = config.get('stomp', 'user') stomp_password = config.get('stomp', 'password') stomp_destination = config.get('stomp', 'destination') notifyd_socket = config.get('notifyd', 'socket') if config.has_option('notifyd', 'ignore_users'): ignore_users = [ x.strip() for x in config.get('notifyd', 'ignore_users').split(',')] else: ignore_users = [] if os.path.exists(notifyd_socket): os.unlink(notifyd_socket) sender = QueueSender([(stomp_host, stomp_port)], stomp_destination, stomp_user, stomp_password) sender.start() sig_handler = SignalHandler(sender) for sig in (signal.SIGTERM, ): signal.signal(sig, sig_handler.handle_signal) try: notifyd = Notifyd(notifyd_socket, sender, ignore_users) notifyd.run_loop() except Exception, e: logging.error("Error in run_loop: %s" % e) sender.force_shutdown() sender.join()