Fork me on GitHub

Source code for taskqueue.worker

"""
The module `taskqueue.worker` contains the abstract class :class:`BaseWorker`.
All your worker plugins should be derived from it.

Your custom functionality should be put into the method `handle_task()`.
And if you want to modify the way how task results are reported or tracked
then override the method `report_results()` of your worker subclass.

Taskqueue uses the `pkg_resources` library to discover registered
plugins. So in order to make your plugins visible to your taskqueue
installation you need to register your worker factories as entry points under
the group `worker.plugins`::

    setup(
        entry_points={
            'worker.plugins': [
                'customworker = yourpackage.yourmodule:YourWorker.factory'
            ]
        }
    )

"""

import logging
import pika
import signal
import os
import traceback

from pwd import getpwnam

from taskqueue.confparser import OPT_RESULTS_ROUTING_KEY
from taskqueue.workitem import get_workitem, WorkitemError, DEFAULT_CONTENT_TYPE

LOG = logging.getLogger(__name__)

CFG_DEFAULT_RES_ROUTING = "results"

[docs]class BaseWorker(object): """Base class for workers.""" #: Specifies list of workitem types accepted by the worker. By default #: workers accept workitems of any type. ACCEPT = ["*/*"] @classmethod
[docs] def factory(cls): """Produce new worker callable.""" return cls()
def __init__(self): """Constructor.""" self.channel = None self.connection = None self.results_routing_key = CFG_DEFAULT_RES_ROUTING self.settings = {} def __call__(self, props, conn_params, queue): """Worker process entry point.""" self.settings.update(props) if "user" in props.keys(): LOG.debug("Try to switch to user '%s'" % props['user']) if os.geteuid() == 0: try: newuid = getpwnam(props["user"])[2] os.seteuid(newuid) LOG.debug("Swithced to uid %d" % newuid) except KeyError: LOG.error("No such user '%s'" % props['user']) else: LOG.warning("Not enough permissions to switch user") self.connection = pika.BlockingConnection(conn_params) self.channel = self.connection.channel() self.channel.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False) self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(self.handle_delivery, queue=queue) signal.signal(signal.SIGTERM, self.cleanup) LOG.debug("created new process with props %r" % props) if OPT_RESULTS_ROUTING_KEY in props.keys(): self.results_routing_key = props[OPT_RESULTS_ROUTING_KEY] self.channel.start_consuming()
[docs] def is_acceptable(self, workitem): """Check if received workitem can be handled by worker. :param workitem: workitem to check :type workitem: Workitem :rtype: boolean """ wtype, subwtype = workitem.mime_type.split('/') for mtype in self.ACCEPT: ctype, subctype = mtype.split('/') if (ctype == '*' or ctype == wtype) and \ (subctype == '*' or subctype == subwtype): LOG.debug("Accept: %s" % workitem.mime_type) return True LOG.error("Workitem '%r got rejected by %s.%s as incompatible" % (workitem, self.__module__, self.__class__.__name__)) return False
[docs] def handle_task(self, workitem): """Handle task. This method is supposed to be overriden in BaseWorker subclasses. :param workitem: workflow work item :type workitem: Workitem :returns: new state of work item :rtype: Workitem """ raise NotImplementedError
[docs] def handle_delivery(self, channel, method, header, body): """Handle AMQP message. :param channel: AMQP channel :type channel: pika.channel.Channel :param method: message's method :type method: pika.frame.Method :param header: message header :type header: pika.frame.Header :param body: message body :type body: string """ LOG.debug("Method: %r" % method) LOG.debug("Header: %r" % header) try: workitem = get_workitem(header, body, self.settings.get('workitem_type_map', None), self.settings.get('default_workitem_type', DEFAULT_CONTENT_TYPE)) except WorkitemError as err: LOG.error("Worker %s.%s can't handle delivery with header '%r' " "and body:\n%s" % (self.__module__, self.__class__.__name__, header, body)) channel.basic_ack(method.delivery_tag) return False wi_out = workitem if self.is_acceptable(workitem): try: wi_out = self.handle_task(workitem) except Exception as err: wi_out.set_error(str(err)) wi_out.set_trace(traceback.format_exc()) else: wi_out.set_error("Worker doesn't support this type of workitems") self.report_results(channel, wi_out) channel.basic_ack(method.delivery_tag) return True
[docs] def report_results(self, channel, workitem): """Report task results back to AMQP. Feel free to override this method. :param channel: AMQP channel :type channel: pika.channel.Channel :param workitem: workflow work item :type workitem: Workitem """ channel.basic_publish(exchange='', routing_key=self.results_routing_key, body=workitem.dumps(), properties=pika.BasicProperties( delivery_mode=2, content_type=workitem.mime_type ))
[docs] def cleanup(self, signum, frame): """Cleanup worker process.""" LOG.debug("target cleanup") self.channel.stop_consuming() self.connection.close()