Archive

Posts Tagged ‘celery’

Drop-in celery AbortableTask replacement

October 24, 2011 1 comment

If you need to report progress updates from the tasks (or you call update_state in the task) you cannot use the bundled AbortableTask from celery.contrib.abortable because it relies on status updates too. That means you’ll get race conditions if you do that.

You can use revokes for aborting tasks but they don’t give you enough control and it’s not guaranteed that your tasks will stop gracefully (or stop at all). Revokes can raise SoftTimeLimitExceeded if enabled (via TERM signal) however it might be tricky to perform cleanup – if you call C extension the exception will get delayed till the call returns. See the signal module docs for what happens when you raise an exception from a signal handler (that’s what celery does).

Given this, an alternative is to use redis to store the aborted task ids in a redis set. If you use the redis broker you can use this drop-in replacement:

from contextlib import contextmanager
import celery
from celery.task.base import Task
from celery.result import AsyncResult

from django.conf import settings

assert settings.BROKER_TRANSPORT == 'redis', "AbortableTask can only work with a 'redis' BROKER_TRANSPORT"
REDIS_KEY = getattr(settings, 'ABORTABLE_REDIS_KEY', 'task-aborts')

@contextmanager
def client_from_pool():
    connection = celery.current_app.pool.acquire()
    try:
        yield connection.default_channel.client
    finally:
        connection.release()

class AbortableAsyncResult(AsyncResult):

    def is_aborted(self):
        with client_from_pool() as client:
            return client.sismember(REDIS_KEY, self.task_id)

    def abort(self):
        with client_from_pool() as client:
            client.sadd(REDIS_KEY, self.task_id)

class AbortableTask(Task):

    @classmethod
    def AsyncResult(cls, task_id):
        return AbortableAsyncResult(task_id, backend=cls.backend,
                                             task_name=cls.name)

    def is_aborted(self, **kwargs):
        task_id = kwargs.get('task_id', self.request.id)
        with client_from_pool() as client:
            return client.sismember(REDIS_KEY, task_id)

    def cleanup(self, **kwargs):
        task_id = kwargs.get('task_id', self.request.id)
        with client_from_pool() as client:
            client.srem(REDIS_KEY, task_id)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        self.cleanup(task_id=task_id)

This will use the broker's connection pool if enabled (you should enable it, just set BROKER_POOL_LIMIT).

Tags: , ,

Tmux scripting

September 25, 2011 5 comments

I usually need to run more than 1 command for some project and got tired of searching through those putty windows for the session I want. So I thought of using a terminal multiplexer like Tmux.

I’m using celery with two queues and I need to run this:

  • manage.py celeryd -Q queueA
  • manage.py celeryd -Q queueB
  • manage.py celerycam -E

I need celerycam because it will get those stats in djcelery up to date.

It’s also a good idea to tail the postgresql log – when you break your models or database in some cases Django isn’t very helpful so this helps a lot:

  • tail -f /var/log/postgresql/postgresql-8.4-main.log

I use a wide screen so I want a layout like this:

    +------------------------------------+-------------------+
    |                                    |                   |
    |              runserver             |                   |
    |                                    |     celerycam     |
    +------------------------------------+                   |
    |                                    |                   |
    |               celeryd              +-------------------+
    |                                    |                   |
    +------------------------------------+                   |
    |                                    |   postgresql log  |
    |               celeryd              |                   |
    |                                    |                   |
    +------------------------------------+-------------------+

I also want to start a new tmux session from the same command so I can close everything easily - those celeryd's don't reload automatically :(

You'd usually run something like:

tmux new-session "tmux splitw 'command1';  tmux splitw 'command3'; tmux splitw 'command3'; command4"

but that get's rather long and you need to quote and escape, calculate the panel sizes manually (I want equal height) and for the layout above you also need to select the right panels before splitting.

The commands vary across projects (some have more and some have less) - so how about we make a script:

import subprocess

left_commands = [
    "python manage.py runserver",
    "python manage.py celeryd -Q queueA -c 2 -E -n worker1",
    "python manage.py celeryd -Q queueB -c 2 -E -n worker2",
]
right_commands = [
    "python manage.py celerycam",
    "tail -f /var/log/postgresql/postgresql-8.4-main.log",
]
session = ''

if right_commands:
    session += 'tmux selectp -t 0; tmux splitw -hd -p 35 \"%s\"; ' % right_commands[-1]
for index, command in enumerate(right_commands[:-1]):
    session += 'tmux selectp -t 1; tmux splitw -d -p %i \"%s\"; ' % (
        100 / (len(right_commands) - index),
        command
    )

for index, command in enumerate(left_commands[1:]):
    session += 'tmux selectp -t 0; tmux splitw -d -p %i \"%s\"; ' % (
        100 / (len(left_commands) - index),
        command
    )
if left_commands:
    session += left_commands[0]

args = [
    'tmux',
    'new-session',
    session,
]
print 'Running ', args
subprocess.call(args)
Follow

Get every new post delivered to your Inbox.

Join 172 other followers