Fork me on GitHub

Workitems

Workitems incapsulate algorithms used to parse bodies of AMQP messages received by dispatchers and workers.

Taskqueue comes with two predefined classes for workitems: BasicWorkitem and RuoteWorkitem. The former one exists for debugging and demonstration purposes mostly and the latter was developed to integrate Taskqueue with Ruote. It is possible to register your own workitem class to extract, for example, pickled python objects from AMQP message body in the way similar to how Celery does it.

The requirements for a Workitem class are:

  1. the class should implement the following methods:

    • __init__(mime_type::string) setting the attribute Workitem.mime_type
    • loads(blob::Blob) to parse AMQP message bodies,
    • dumps()::Blob to convert workitem’s state back to AMQP message body,
    • set_error(error::string) to set error message,
    • set_trace(trace::string) to set traceback,
    • property worker_type to let the dispatcher know where to dispatch the workitem to;

    For your convience there is an abstract class Workitem exposing all these methods. You just need to override its abstracts methods in the class inheriting to Workitem.

  2. the class needs to be registered as a setuptool resource under the group name workitems:

    setup(
        entry_points={
            'workitems': [
                'application/x-your-workitem = yourpackage.yourmodule:YourWorkitemClass'
            ]
        }
    )
    
class taskqueue.workitem.BasicWorkitem(mime_type)[source]

Basic workitem.

The format of a message body understandable by this class is a simple string: <worker_type> <the rest of the body>.

taskqueue.workitem.DEFAULT_CONTENT_TYPE = 'application/json'

Default content type

taskqueue.workitem.DEFAULT_CONTENT_TYPE_MAP = {'application/json': 'application/x-ruote-workitem', 'text/plain': 'application/x-basic-workitem'}

Default mappings of workitem types

class taskqueue.workitem.RuoteWorkitem(mime_type)[source]

Ruote workitem.

This class is used to parse JSON-based Ruote workitems like:

{
    "re_dispatch_count": 0,
    "participant_name": "hardworker",
    "wf_revision": null,
    "fields": {
        "repo": "testrepo1",
        "pkgname": "python-riak",
        "pkgversion": "1.2.1",
        "branch": "master2",
        "workdir": "/home/rozhkov/tmp",
        "dispatched_at": "2012-03-04 14:00:22.861908 UTC",
        "params": {
            "participant_options": {
                "forget": false,
                "queue": "taskqueue"
            },
            "worker_type": "simplebuilder",
            "ref": "hardworker"
        },
        "user": "vasya"
    },
    "wf_name":null,
    "fei": {
        "wfid": "20120304-bejeruwodi",
        "engine_id": "engine",
        "expid": "0_1_3",
        "subid": "8079afecd0256e8280b355455ea3435f"
    }
}
class taskqueue.workitem.Workitem(mime_type)[source]

Base abstract class for workitems.

dumps()[source]

Serialize workitem.

This is abstract method.

loads(blob)[source]

Load workitem from given blob.

This is abstract method.

set_error(error)[source]

Set worker’s error message.

This is abstract method.

set_trace(trace)[source]

Set worker’s traceback.

This is abstract method.

worker_type[source]

Return type of worker this workitem was sent to.

taskqueue.workitem.get_workitem(amqp_header, amqp_body, ctype_map=None, default_ctype='application/json')[source]

Constructs workitems of a certain type.

Parameters:
  • amqp_header (pika.frame.Header) – AMQP message header
  • amqp_body (blob) – AMQP message body
  • ctype_map (dictionary|string) – workitem type mapping
  • default_ctype (string) – default workitem type