Fork me on GitHub

Workers

The module taskqueue.worker contains the abstract class BaseWorker. All your worker plugins should be derived from it.

Your custom functionality should be put into the method handle_task(). And if you want to modify the way how task results are reported or tracked then override the method report_results() of your worker subclass.

Taskqueue uses the pkg_resources library to discover registered plugins. So in order to make your plugins visible to your taskqueue installation you need to register your worker factories as entry points under the group worker.plugins:

setup(
    entry_points={
        'worker.plugins': [
            'customworker = yourpackage.yourmodule:YourWorker.factory'
        ]
    }
)
class taskqueue.worker.BaseWorker[source]

Base class for workers.

ACCEPT = ['*/*']

Specifies list of workitem types accepted by the worker. By default workers accept workitems of any type.

cleanup(signum, frame)[source]

Cleanup worker process.

classmethod factory()[source]

Produce new worker callable.

handle_delivery(channel, method, header, body)[source]

Handle AMQP message.

Parameters:
  • channel (pika.channel.Channel) – AMQP channel
  • method (pika.frame.Method) – message’s method
  • header (pika.frame.Header) – message header
  • body (string) – message body
handle_task(workitem)[source]

Handle task.

This method is supposed to be overriden in BaseWorker subclasses.

Parameters:workitem (Workitem) – workflow work item
Returns:new state of work item
Return type:Workitem
is_acceptable(workitem)[source]

Check if received workitem can be handled by worker.

Parameters:workitem (Workitem) – workitem to check
Return type:boolean
report_results(channel, workitem)[source]

Report task results back to AMQP.

Feel free to override this method.

Parameters:
  • channel (pika.channel.Channel) – AMQP channel
  • workitem (Workitem) – workflow work item