Wichert Akkerman

Software design? Strange hacks? All of the above please!

When writing a web application or REST backend I occasionally run into a
common problem: sometimes I need to do things that will take a while: sending a
push notification to mobile apps, generating emails or doing an expensive
calculation. Doing this immediately would result in very long response times,
which is not acceptable. That means I need a way to offload those to something
else. This is where task queues come in: they allow you to submit a task for
out-of-band processing. There are multiple tools that do this, but all of them
suffer from one or more design flaws that makek them harder to use in non-trivial
systems. This document tries to address a few requirements and looks at how
existing frameworks try to address them.

The examples use Pyramid, but the problems are generic and apply to all
frameworks.

Global configuration

A common fallacy is requiring the configuration to be defined globally before
you can define a task. Here is an example from
Celery:

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

add.delay(4, 4)

or the rq version (note that this style is optional with rq)

from rq.decorators import job

@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
    return x + y

This pattern makes it almost impossible to make your application configurable:
it requires your connection details to be set on a global before you can import
any code that defines a task, which will quickly lead to complex import depenedencies
and can be impossible with some frameworks. A better approach is be to decouple
configuration from the task definition. For example (from rq again):

def add(x, y):
    return x + y

redis_conn = Redis()
q = Queue(connection=redis_conn)  # no args implies the default queue
job = q.enqueue(add, 4, 4)

Transparency

When you write an expensive function that you want to delay, you also want
to be able to call it immediately in some situations, for example when you
call it from another function that is itself delayed. Building on our example
we might want to call add directly from a new add_and_multiply function.

def add(x, y):
    reeturn x + y


def add_and_multiple(x, y):
    return (add(x, y), x ** y))  # Do not delay this call to add

There should also not be any requirements on function naming. Celery has
a problem here: it does not allow you to define two tasks with the same
name in different modules. If you do this:

# view1.py
@task()
def send_mail():
    pass

# view2.py
@task()
def send_mail()
    pass

If you call view2.send_mail Celery will happily run ``view1.send_mail`
instead without telling you.

Task context

Tasks commonly expect to run within a configured environment, similar to how
tests often require a fixture: a working database connection, a transaction
manager, application configuration available, etc. There are three types
of context to distinguish:

  • Global state you can load once, for example application configuration.
  • Per-process state that has to be initialised for every new process. Things
    like database connections fall in this category. For an application using
    Pyramid you would call
    pyramid.paster.bootstrap
    to set this up.
  • Per-task state. Common examples of this are running each task in a separate
    transaction and managing thread-local variables.

For task context you may need more control to be able to cleanly handle
exceptions or job teardown. As an example to run a task in a transaction you
may want to use something like this:

import transaction

def transaction_state(func):
    with transaction.manager as tx:
        result = func()
	tx.commit()
	return result

worker.add_task_state_handler(transaction_state)

A concept like Pyramid’s
tweens

may be useful here.

Celery seems to be the only framework which tries to address this with its
signals
mechanism.

Parameter handling

When calling an expensive function that must be run out-of-process I do not
want to have to worry about what parameters I can safely pass to it. For
example if I am using SQLAlchemy I want to be able to
just pass an ORM instance to a function. The system should automatically detect
that so it can flush any state out to the SQL server or abort if the object is
dirty, and before running my function in a worker process merge the instances
into the current session. This is some pseudo-code for a Pyramid view to register
new users:

def send_welcome_mail(user):
    """Generate and send a welcome email to a new user.
    """


@view_config('register', renderer='welcome.pt')
def register_user(request):
    user = User(email=request.params['email'])
    DBSession.add(user)
    request.registry['task-queue'].submit(send_welcome_mail, user)
    return {'user': user}

This is also useful to allow passing requests. A standard request has many things
such as open file handles and a copy of the request body that you can now safely
transfer to a task queue, but you do want to transfer any data over that is needed
to generate URLs with the right scheme, hostname and port. To make this transparent
that requires stripping a request down before queueing the task, and recreating
it before running the task.

def send_push_message(request, device_token):
    site_url = request.route_url('news')
    message = Message([device_token],
        alert='Our website has been updated. Please visit %s' % site_url)
    apn_server.submit(message)


@view_config('publish', renderer='publish.pt')
def publish_page(request):
    ....
    request.registry['task-queue'].submit(send_push_message, user.token)
    return {}

Malthe Borch pointed out using a __reduce__
method can help here. He uses this
snippet
to handle
pickling of SQLAlchemy instances. None of the existing frameworks try to address this.

Transaction integration

When writing a function to register new users I submit a task for later processing
to generate a welcome email for the new user. But what if I hit a critical error
after submitting that task that causes the new user not be created in my database?
In that situations the task needs to be aborted before it has a chance to run. This
requires integration of the task queue with the transaction manager.

The example below has a critical error which causes the transaction to be aborted.
In that case the transaction will be aborted and the user will be shown an error,
so it would be very confusing if the welcome email was still send.

@view_config('register', renderer='welcome.pt')
def register_user(request):
    user = User(email=request.params['email'])
    DBSession.add(user)
    request.registry['task-queue'].submit(send_welcome_mail, user)
    return {'user': usr}  # Code error: usr is undefined

Sometimes you may need to return a task id to the user so he can come back later
to fetch a result. This requires two-phase commit support from the task queue so
it can allocate a task id before submitting the task.

@view_config('rest-call', renderer='json')
def rest_call(request):
    # Schedule an expensive function
    task = request.registry['task-queue'].submit(expensive_function)
    # Tell client to check the task status in 10 seconds.
    request.response.status_int = 202
    request.headers['Location'[ = request.route_url('task-status', task_id=task.id)
    return {'task': task.id, 'delay': 10}

None of the existing frameworks try to address this.