Using Celery to Handle Asynchronous Processes


This article continues the series on building a continuous deployment environment using Python and Django.

Those of you following along, now have the tools to setup a Python/Django project, fully test it, and deploy it. Today we will be discussing the Celery package, which is an open source asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

Simply put, processes can be run asynchronously and distributed, instead of by the main app. This allows complex calculation, heavy data processing, or third-party services to all be run without blocking the main Django/Python app. And when run on a remote server, without using resources of the main Django app. Celery is a Python project, but there is an app, django-celery, which plugs into Django.

Getting ready

To install Celery, enter your virtualenv and call:

pip install celery

Install a technology to manage the Celery queue (RabbitMQ is recommended). Full RabbitMQ installation instructions are available at http://www.rabbitmq.com/install.html.

To install RabbitMQ on debian/ubuntu:

sudo apt-get install rabbitmq-server

RabbitMQ will start automatically upon installation. To start/stop RabbitMQ manually on debina/ubuntu:

invoke-rc.d rabbitmq-server start
invoke-rc.d rabbitmq-server stop

On most systems the log file for RabbitMQ can be found at /var/log/rabbitmq/rabbit.log.

To install RabbitMQ on OSX (this will take a long time):

sudo brew install rabbitmq

RabbitMQ will be installed to /usr/local/sbin, so add this directory to your PATH, if you haven't already done so.To start/stop RabbitMQ manually on OSX:

sudo rabbitmq-server
sudo rabbitmqctl stop

To install Celery for Django:

pip install django-celery

How do it…

Setup RabbitMQ for use with Celery:

sudo rabbitmqctl add_user {CELERY_USER} {CELERY_USER_PASS}
sudo rabbitmqctl add_vhost {CELERY_VHOST}
sudo rabbitmqctl set_permissions -p {CELERY_VHOST} {CELERY_USER} ".*" ".*" ".*"

Don't use any periods in the {CELERY_VHOST} or Celery won't be able to connect to RabbitMQ. If the server cannot connect on OSX, see Broker Installation for additional setup information.

Create your first task, task.py:

from celery.task import task

@task
def add(x, y):
    return x + y

Create a configuration file to run your Celery task, celeryconfig.py:

BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "{CELERY_USER}"
BROKER_PASSWORD = "{CELERY_USER_PASS}"
BROKER_VHOST = "{CELERY_VHOST}"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("tasks", )

If RabbitMQ is running remotely, change localhost to the name of the remote server.

Now to test, start Celery (it will print to the console):

celeryd --loglevel=INFO

In another terminal, open the Python shell and run:

from tasks import add
from celery.execute import send_task
result = add.apply_async(args=[2, 2], kwargs={})
print result.get()
result = send_task('task.add', [3, 3])
print result.get()
result = add.delay(4, 4)

You should see something like the following in the terminal window running Celery:

[2011-08-31 23:43:44,242: INFO/MainProcess] Got task from broker: tasks.add[f5f5ee81-fef5-46d2-87de-0da005d588d0]
[2011-08-31 23:43:44,294: INFO/MainProcess] Task tasks.add[f5f5ee81-fef5-46d2-87de-0da005d588d0] succeeded in 0.0111658573151s: 4
[2011-08-31 23:43:44,301: INFO/MainProcess] Got task from broker: tasks.add[cd2de0d1-35ad-4d7a-8212-47e800cd85bc]
[2011-08-31 23:43:44,298: INFO/MainProcess] Task tasks.add[cd2de0d1-35ad-4d7a-8212-47e800cd85bc] succeeded in 0.0115258693695s: 6
[2011-08-31 23:43:44,301: INFO/MainProcess] Got task from broker: tasks.add[cd2de0d1-35ad-4d7a-8212-47e900cd85bc]
[2011-08-31 23:43:44,329: INFO/MainProcess] Task tasks.add[cd2de0d1-35ad-4d7a-8212-47e900cd85bc] succeeded in 0.0115258693695s: 8

To run celery as a daemon process on debian/ubuntu, install https://github.com/ask/celery/tree/master/contrib/generic-init.d/, and run:

/etc/init.d/celeryd {start|stop|restart|status}

To configure your process, edit /etc/default/celeryd:

# Name of nodes to start # here we have a single node CELERYD_NODES="w1" # or we could have three nodes: #CELERYD_NODES="w1 w2 w3"

# Where to chdir at start. CELERYD_CHDIR="/opt/Myproject/"

# Extra arguments to celeryd CELERYD_OPTS="-time-limit=300 -concurrency=8"

# Name of the celery config module. CELERY_CONFIG_MODULE="celeryconfig"

# %n will be replaced with the nodename. CELERYD_LOG_FILE="/var/log/celery/%n.log" CELERYD_PID_FILE="/var/run/celery/%n.pid"

# Workers should run as an unprivileged user. CELERYD_USER="celery" CELERYD_GROUP="celery"

For deamon scripts on other operating systems and for more information on configuration, see Celery Daemonizing.

Django-Celery

If you use django-celery, you won't need the celeryconfig.py file, as the Celery configuration will live in the project's settings.py.

To start add djcelery to INSTALLED_APPS, then add the following to settings.py:

import djcelery
djcelery.setup_loader()

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
CELERY_RESULT_BACKEND = "amqp"
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "{CELERY_USER}"
BROKER_PASSWORD = "{CELERY_USER_PASS}"
BROKER_VHOST = "{CELERY_VHOST}"

To create the necessary database tables:

python manage.py syncdb

For those using mod_wsgi, add the following to your *.wsgi file:

import os
os.environ["CELERY_LOADER"] = "django"

Celery will automatically look for files named tasks.py in other installed apps and process them accordingly. Another great use for Celery are periodic tasks:

from datetime import timedelta
from celery.decorators import periodic_task

@periodic_task(run_every=timedelta(days=1))
def update_users():
    aggregate_user_data() # not defined here, just an example

There is some additional information available at http://ask.github.com/django-celery/.

Lastly, start a process to snapshot the workers, so you can use Django to monitor celery:

nohup python manage.py celerycam &

More information on monitoring is available at Celery Monitoring.

How it works…

Celery uses tasks that are executed concurrently on one or more workers. Tasks can execute asynchronously or synchronously (result = task.get() or task.wait()). A third party service is used for managing queueing of tasks and storing of task results. RabbitMQ is recommended for the queue, although many DB technologies are supported, and I prefer AMQP or Redis for the results backend.

To create a Celery task, first create a file named tasks.py and then add task functions there. Decorate each function that will be explicitly called with @task and functions that execute periodically with @periodic_task(run_every=timedelta({FREQUENCY})). Tasks, including periodic tasks, can be called explicity using mytask.delay(*args, **kwargs). This returns a result object, which can be used to lock the current thread until the task is completed using result.wait() or response = result.get().

The Celery worker process will log all activity, so monitor it for errors (usually /var/logs/celeryd.log). Try to make sure task functions gracefully handle errors, so that no data is ever lost.

The celery-django package manges your Celery commands and configuration, adds an admin tool, and discovers tasks.py automatically in all your apps. To monitor the Celery workers via Django start the celerycam process, which will take periodic snapshots of the workers and write to the djcelery_taskstate table.

This should provide enough information for you to start using Celery in your own projects. Feel free to leave any questions you may have.

There’s more…

Much of this article was inspired by the Celery Documentation. I recommend starting there is you have any questions. For additional information: