The Python Celery Cookbook: Small Tool, Big Possibilities

Everyone in the Python community has heard about Celery at least once, and maybe even already worked with it. Basically, it’s a handy tool that helps run postponed or dedicated code in a separate process or even on a separate computer or server. This saves time and effort on many levels.

An Introduction to the Celery Python Guide

Celery decreases performance load by running part of the functionality as postponed tasks either on the same server as other tasks, or on a different server. Most commonly, developers use it for sending emails. However, Celery has a lot more to offer. In this article, I’ll show you some Celery basics, as well as a couple of Python-Celery best practices.

Celery Basics

If you have worked with Celery before, feel free to skip this chapter. But if Celery is new to you, here you will learn how to enable Celery in your project, and participate in a separate tutorial on using Celery with Django. Basically, you need to create a Celery instance and use it to mark Python functions as tasks.

It’s better to create the instance in a separate file, as it will be necessary to run Celery the same way it works with WSGI in Django. For example, if you create two instances, Flask and Celery, in one file in a Flask application and run it, you’ll have two instances, but use only one. It’s the same when you run Celery.

Primary Python Celery Examples

As I mentioned before, the go-to case of using Celery is sending email. I will use this example to show you the basics of using Celery. Here’s a quick Celery Python tutorial:

from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context

from myproject.celery import app


def render_template(template, context):
    engine = Engine.get_default()

    tmpl = engine.get_template(template)

    return tmpl.render(Context(context))


@app.task
def send_mail_task(recipients, subject, template, context):
    send_mail(
        subject=subject,
        message=render_template(f'{template}.txt', context),
        from_email=settings.DEFAULT_FROM_EMAIL,
        recipient_list=recipients,
        fail_silently=False,
        html_message=render_template(f'{template}.html', context)
    )

This code uses Django, as it’s our main framework for web applications. By using Celery, we reduce the time of response to customer, as we separate the sending process from the main code responsible for returning the response.

The simplest way to execute this task is to call delay method of function that is provided by app.task decorator.

send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})

Not only this – Celery provides more benefits. For example, we could set up retries upon failing.

@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
    message = render_template(f'{template}.txt', context)
    html_message = render_template(f'{template}.html', context)
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email=settings.DEFAULT_FROM_EMAIL,
            recipient_list=recipients,
            fail_silently=False,
            html_message=html_message
        )
    except smtplib.SMTPException as ex:
        self.retry(exc=ex)

Now the task will be restarted after ten minutes if sending fails. Also, you’ll be able to set the number of retries.

Some of you may wonder why I moved the template rendering outside of the send_mail call. It’s because we wrap the call of send_mail into try/except, and it’s better to have as little code in try/except as possible.

Celery for Advanced Users

Celery Django Scheduled Tasks

Celery makes it possible to run tasks by schedulers like crontab in Linux.

First of all, if you want to use periodic tasks, you have to run the Celery worker with –beat flag, otherwise Celery will ignore the scheduler. Your next step would be to create a config that says what task should be executed and when. Here’s an example:

from celery.schedules import crontab


CELERY_BEAT_SCHEDULE = {
    'monday-statistics-email': {
        'task': 'myproject.apps.statistics.tasks.monday_email',
        'schedule': crontab(day_of_week=1, hour=7),
    },
}

if you don’t use Django, you should use celery_app.conf.beat_schedule instead of CELERY_BEAT_SCHEDULE

What we have in this configuration is only one task that will be executed every Monday at 7 a.m.. The root key is a name or a cronjob, not a task.

You can add arguments to tasks and choose what should be done in case the same task should run at different times with different arguments. The crontab method supports the syntax of the system crontab – such as crontab(minute=’*/15’) – to run the task every 15 minutes.

Postponed Task Execution In Celery

You can also set tasks in a Python Celery queue with timeout before execution. (For example, when you need to send a notification after an action.) To do this, use the apply_async method with an eta or countdown argument.

eta - execute task at exact time

countdown - execute task in N seconds

Let’s look at what it might look like in code:

from datetime import datetime


send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    countdown=15 * 60
)

send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    eta=datetime(2019, 5, 20, 7, 0)

In the first example, the email will be sent in 15 minutes, while in the second it will be sent at 7 a.m. on May 20.

Setting Up Python Celery Queues

Celery can be distributed when you have several workers on different servers that use one message queue for task planning. You can configure an additional queue for your task/worker. For example, sending emails is a critical part of your system and you don’t want any other tasks to affect the sending. Then you can add a new queue, let’s call it mail, and use this queue for sending emails.

CELERY_TASK_ROUTES = {
    'myproject.apps.mail.tasks.send_mail_task': {'queue': 'mail', },
}

if you don’t use Django, use celery_app.conf.task_routes instead of CELERY_TASK_ROUTES

Run two separate celery workers for the default queue and the new queue:

celery -A myproject worker -l info -Q celery
celery -A myproject worker -l info -Q mail

The first line will run the worker for the default queue called celery, and the second line will run the worker for the mail queue. You can use the first worker without the -Q argument, then this worker will use all configured queues.

Python Celery Long Running Tasks

Sometimes, I have to deal with tasks written to go through database records and perform some operations. Quite often, developers forget about data growth, which can lead to a very long task running time. It’s always better to write tasks like these in a way that allows working with data chunks. The easiest way is to add an offset and limit parameters to a task. This will allow you to indicate the size of the chunk, and the cursor to get a new chunk of data.

@celery_app.task
def send_good_morning_mail_task(offset=0, limit=100):
    users = User.objects.filter(is_active=True).order_by('id')[offset:offset + limit]
    for user in users:
        send_good_morning_mail(user)

    if len(users) >= limit:
        send_good_morning_mail_task.delay(offset + limit, limit)

This is a very simple example of how a task like this can be implemented. At the end of the task, we check how many users we found in the database. If the number equals the limit, then we’ve probably got new users to process. So we run the task again, with a new offset. If the user count is less than the limit, it means it’s the last chunk and we don’t have to continue. Beware, though: this task implementation needs to have the same ordering for records every time.

Celery: Getting Task Results

Most developers don’t record the results they get after running the task. Imagine that you can take a part of code, assign it to a task and execute this task independently as soon as you receive a user request. When we need the results of the task, we either get the results right away (if the task is completed), or wait for it to complete. Then we include the result to the general response. Using this approach, you can decrease response time, which is very good for your users and site rank.

We use this feature to run simultaneous operations. In one of our projects, we have a lot of user data and a lot of service providers. To find the best service provider, we do heavy calculations and checks. To do it faster, we create tasks for user with each service provider, run them and collect results to show to the user. It’s very easy to do with Celery task groups.

from celery import group

@celery_app.task
def calculate_service_provider_task(user_id, provider_id):
    user = User.objects.get(pk=user_id)
    provider = ServiceProvider.objects.get(pk=provider_id)

    return calculate_service_provider(user, provider)


@celery_app.task
def find_best_service_provider_for_user(user_id):
    user = User.objects.get(pk=user_id)
    providers = ServiceProvider.objects.related_to_user(user)

    calc_group = group([
        calculate_service_provider_task.s(user.pk, provider.pk)
        for provider in providers
    ]).apply_async()

    return calc_group

First, why do we even run two tasks? We use the second task to form calculation task groups, launch and return them. On top of that, the second task is where you can assign project filtration – like service providers that need to be calculated for a given user. All this can be done while Celery is doing other work. When the task group returns, the result of the first task is actually the calculation we are interested in.

Here’s an example of how to use this approach in code:

def view(request):
    find_job = find_best_service_provider_for_user.delay(request.user.pk)

    # do other stuff

    calculations_results = find_job.get().join()

    # process calculations_results and send response

Here, we run calculations as soon as possible, wait for the results at the end of the method, then prepare the response and send it to the user.

Useful Tips

Tiny Data

I’ve probably already mentioned that I use database record IDs as task arguments instead of full objects. This is a good way to decrease the message queue size. But what’s more important is that when a task is executed, the data in the database can be changed. And when you have only IDs, you will get fresh data as opposed to outdated data you get when passing objects.

Transactions

Sometimes, issues may arise when an executed task can’t find an object in a database. Why does this happen? In Django, for instance, you want to run tasks after a user is registered, like sending a greeting email, and your Django settings wrap all requests into a transaction. In Celery, however, tasks are executed fast, before the transaction is even finished. So if you use Celery when working in Django, you might see that the user doesn’t exist in the database (yet).

To deal with this, you can Google “task transaction implementation”. In general, it’s an overwritten apply_async method in task, a class that sets up a task in transaction.on_commit signal instead of doing it immediately.

Conclusion

As you see, Celery has a lot more uses than just sending emails. You can run different tasks simultaneously using the main process, and while you do your job, Celery will complete the smaller tasks at hand. You can set up queues, work with data chunks on long-running tasks, and set up times for your tasks to be executed. This will allow you to better plan your work progress, plan development time more efficiently, and spend your precious time working on the bigger things while Celery task groups work their magic.