Django + Celery (Practical Implementation) Under 9 Mins

Rahul Beniwal
6 min readMay 9, 2023

Celery is a task queue that allows us to offload time consuming and computing intensive task to back ground workers and save the main thread from blocking.

Celery can be integrated with popular message brokers like RabbitMQ, Redis.

Image credit https://velog.io/@sms8377/Celery-Python-Celery%EB%9E%80

Create a basic Django application using this.

Installing Following Package

pip install celery pymongo

consider that your project name in django_revision (Project folder is where settings.py exists).

create celeryconfig.py inside django_revision.

# django_revision/celeryconfig.py


# using rabbitmq as the broker
broker_url = "amqp://guest:guest@localhost:5672//"

# We can either set Redis, RabbitMQ, Postgresql, MySQL, MongoDB
# or other databases as the result backend.
# using mongodb as the result backend

result_backend = "mongodb://localhost:27017/django-revision-celery-log"


# User pickle for complex data structures while passing arguments to tasks
# json, yaml, msgpack are other options.
task_serializer = "pickle"

timezone = "Asia/Kolkata"

# celery caches the results of tasks, so that if the same task is called again
# it can return the cached result instead of re-executing the task.
# By Default celery uses the Django cache backend
# using database redis 2 as the cache backend

cache_backend = "redis://localhost:6379/2"

# Either define imports here or set autodiscover_tasks either one is enough
imports = ("django_revision.tasks",)

# celery will stop accepting pickled content so need to allow pickle for deserialization
accept_content = ["pickle"]

create celery.py inside django_revision which is entry point for celery.

# django_revision/celery.py

import os
from celery import Celery
from django.conf import settings


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_revision.settings")

app = Celery("django_revision")

# app.config_from_object('django.conf:settings', namespace='CELERY')
app.config_from_object('celeryconfig', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

So enough configuration for now.

Using Celery for running blocking task asynchronously

  1. Starting celery for receiving tasks.

celery -A django_revision worker -l info

I created a function which will mimic blocking behavior like we are sending mail.

# django_revision/tasks.py

from django_revision.celery import app

@app.task
def send_mail(to_user, from_user, subject, message):
import time
time.sleep(10)
print(f"Mail will be sent to {to_user} from {from_user} with subject {subject} and message {message}")
return "OK"

def send_report_in_email():
to_user = "to_user@gmail.com"
send_mail(to_user, "from_user@gmail.com", "Report", "Report Message")
return f"Email sent to {to_user}"

send_report_in_email()

As sending mail need to make network call so it will block main python thread. So after 10 second we will get this.

Mail will be sent to to_user@gmail.com from from_user@gmail.com with subject Report and message Report Message
'Email sent to to_user@gmail.com'

This will impact our app performance as our main thread will not able to do any operation. There can be more hazard if we are sending this mail with attachment and attachment creation take around 5 minute then it will be disaster. But we have celery so no worries.

2. Converting above code to Async Task

As we have already registered send_mail as celery task ( defining it in any django app tasks.py) so now we need to send these task to celery queue.

delay and apply_async are used to send task to celery queue.

send_mail.delay(to_user, "from_user@gmail.com", "Report", "Report Message")
Celery Log indicate task received and consumed by workers.

This time when main thread encounters delay it will start this task execution on another task and main thread will continue executing further code.

apply_async

Both delay and apply_async are used for submitting task to celery but delay is the basic one and apply_async support more useful and advance features.

from django_revision.tasks import send_mail


def send_report_in_email():
to_user = "to_user@gmail.com"
send_mail.apply_async(
args=[to_user, "from_user@gmail.com", "Report", "Report Message"],
kwargs={},
countdown=1,
retry=True,
priority=0,
queue="default",
retry_policy={
"max_retries": 3,
"interval_start": 0,
"interval_step": 0.2,
"interval_max": 0.2,
})
return f"Email sent to {to_user}"


send_report_in_email()

Main difference is the way we pass the function argument to the task deley allow to send as normal argument to function but apply_async provide more flexible way to pass.

Optional options for apply_async

  1. args -> Positional argument to be sent to task
  2. kwargs -> Keyword arguments need to be sent.
  3. countdown -> Integer or Float indicate how many seconds to wait before executing task.
  4. retry -> Boolean indicating while task should be retry if fails
  5. retry_policy -> A dict containing policy for retry like maximum number of retry, retry interval and other.
  6. priority -> Task priority lower number indicate higher priority.
  7. eta -> A Datetime object specifying specific time when task to be executed.
  8. expires -> A Datetime object indicating a time after which task should be expired if not executed.
  9. queue -> String indicating name of queue where task should be sent.

We can also add advance options while register the task to celery.

app.task Options

from django_revision.celery import app

@app.task(
name="django_revision.tasks.send_mail",
bind=True,
autoretry_for=(Exception,),
max_retries=3,
ignore_result=False,
retry_backoff=3,
retry_jitter=0.1,
retry_backoff_max=60 * 5
)
def send_mail(self, to_user, from_user, subject, message):
import time
time.sleep(10)
print(f"Mail will be sent to {to_user} from {from_user} with subject {subject} and message {message}")
return "OK"
  1. bind -> If bind is set it will convert task to instance method from class method it is useful when we want to need task specific information in the function otherwise no need.
  2. name -> Name of task otherwise celery will use function name as task name.
  3. autoretry_for -> Auto Retry task for specific exceptions.
  4. max_retries -> This options specifies how many times a task should retry if fails with the delay specified in retry_backoff if set.
  5. ignore_result -> Don't store task result in the result_backend.
  6. retry_backoff -> Back off strategy used when we are retrying. Consider it is set to 3 then first retry will be after 3 second and second will be after 6 seconds and third will be after 18 and so on.
  7. retry_jitter -> It add random delay to retrials intervals. I we set it to 5 then the retry will be in between (retrial_interval, retail_interval + 5).
  8. retry_backoff_max -> It specifies maximum amount of wait celery should do between task retails. Consider that 4 retrial is going to be at 30 second but retry_backoff_max is set to 12 then instead of regular time that retrial will happen on 12th seconds.

There are many other options too please check here.

That’s enough for periodic task lets move on to periodic tasks.

Periodic Task using celery

Add a periodic task in tasks.py

# django_revision/tasks.py

from celery import shared_task

@shared_task
def greet_user():
print("Hello User")

Yo task added.

Create a file django_revision/periodic_tasks.py for storing schedule for periodic tasks.

from celery.schedules import crontab


CELERY_BEAT_SCHEDULE = {
'greet_user': {
'task': 'django_revision.tasks.greet_user',
'schedule': crontab(minute='*/1')
}
}
Crontab Job Scheduling Format

Here */1 meaning run after every 1 minute.

As we are using celeryconfig.py for managing celery configuration so import CELERY_BEAT_SCHEDULE at the end of celeryconfig.

# Perodic tasks

from django_revision.periodic_tasks import CELERY_BEAT_SCHEDULE

Now finally run the beat scheduler process.

celery -A django_revision beat -l debug

Make sure your result_backend is configured well if results is None then check your result_backend configuration.

Important terms.

  1. shared_task -> This decorator allow us to define a task once and share it between multiple celery apps.
  2. CELERY_BEAT_SCHEDULE -> This settings allows us to define schedule for recurring task which will be run by beat scheduler. It must be python dict that map task name to scheduling information. task key defined name of the task and schedule key define schedule for the task.
  3. */1 -> Here * means for every.

Thats every thing you need to kick start with Celery with Django. But there are other third party Django packages like django_crontab and django_celery_beat for advance celery usage. I shall cover them in upcoming writings.

Leave a 👏 if you like this.

--

--