Django + Celery - asynchroniczna kolejka zadań

8 minut(y)

Czym jest Celery i kiedy warto go używać?

Celery jest to asynchroniczna kolejka zadań oparta na przekazywaniu rozproszonych wiadomości. Zadania mogą być umieszczane w kolejce (zlecane) poprzez np. aplikacje webowe lub inne oprogramowanie.

Klasyczne aplikacje webowe działają w cyklach żądanie-odpowiedź (request-response). Gdy użytkownik wpisze w pasku adresu przeglądarki url aplikacji, lub kliknie na odpowiednie link, to przeglądarka wysyła żądanie do aplikacji webowej. Aplikacja przetwarza to żądanie, generuje zapytania do bazy danych, parsuje szablony itp. i na końcu zwraca do przeglądarki odpowiedź. Cały ten cykl powinien trwać jak najkrócej, tak aby użytkownik nie musiał długo czekać na rezultat żądania. Dodatkowo, aplikacja może w tym samym niemal czasie otrzymywać wiele żądań od różnych użytkowników i długi czas przetwarzania żądań wpływa niekorzystnie na wydajność samej aplikacji.

Aby skrócić czas przetwarzania żądania stosuje się różne metody i zabiegi np. optymalizuje się zapytania do bazy danych, stosuje mechanizmy cache itp. Zdarzają się jednak sytuacje, że trzeba wykonać złożone czasowo lub obliczeniowo zadania (generowanie raportu, eksport dużej ilości danych, przetwarzanie obrazów/clipów video, pobranie danych z zewnętrznych serwisów/api itp). Takie zadania mogą wykonywać się w czasie znacząco dłuższym niż typowy czas przetwarzania żądania i wówczas nie powinno się ich wykonywać w cyklu request-response.

Z pomocą tutaj przychodzi Celery, które może wykonywać asynchronicznie zlecone czasochłonne zadania. Rolą Celery jest przyjąć w jak najkrótszym czasie zlecenie wykonania zadania, a samo zadanie umieścić w kolejce do wykonania w tle.

W tym wpisie przedstawię tylko podstawy współpracy z Celery, celowo pomijając bardziej złożone kwestie, takie jak rezultaty wykonanych zdań, dystrybucje zadań do wielu kolejek, priorytetowanie, sterowanie przepływem itp. Być może w przyszłości pojawi się oddzielny wpis związany z tymi kwestiami.

Instalacja Celery

Instalację Celery najprościej jest przeprowadzić w wykorzystaniem narzędzia pip:

$ pip install celery

Celery do działania potrzebuje jeszcze pośrednika (brokera) do przesyłania komunikatów. Możemy tutaj skorzystać z Redis, RabbitMQ, AmazonSQS lub Zookeeper. Dla naszych potrzeb skorzystamy z brokera Redis. Musimy zainstalować go niezależnie w systemie:

# ubuntu
$ sudo apt install redis-server
$ sudo systemctl enable redis-server
$ sudo systemctl start redis-server

od razu zainstalujmy niezbędne komponenty dla przykładowej aplikacji:

$ pip install redis requests

Konfiguracja i współpraca z Django

Od wersji 3.1 Celery potrafi już samodzielnie współpracować z projektem Django (we wcześniejszych wersjach wymagana była instalacja dodatkowej biblioteki django-celery). Załóżmy, że mamy następujący szkielet typowej aplikacji:

├── djcel
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
└── weather
    ├── __init__.py
    ├── admin.py
    ├── apps.py
    ├── forms.py
    ├── models.py
    ├── tests.py
    ├── urls.py
    └── views.py

Zaczniemy od zdefiniowania instancji Celery w projekcie Django. W tym celu w aplikacji projektu stworzymy plik celery.py:

# weather/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('djcel')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

zapewnijmy teraz, że celery zostanie załadowanie przy uruchamianiu aplikacji django.

# weather/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app')

i na zakończenie skonfigurujmy jeszcze odpowiedniego brokera dla Celery. Robimy to w proj/settings.py poprzez dodanie linii:

# djcel/settings.py

CELERY_BROKER_URL = 'redis://loclahost:6379'

Tworzymy pierwsze zadanie

Konfiguracja przygotowana, więc możemy przystąpić do pisania pierwszego zadania dla naszej aplikacji. Niech będzie to zadanie pobrania danych o aktualnej pogodzie dla wskazanej miejscowości z serwisu .openweather.org. Do pobierania danych z API tego serwisu będziemy potrzebować założyć w nim darmowe konto i uzyskać API key. Jeśli mamy już API key możemy przystąpić do pisania kodu. Zadanie nasze będzie działało w sposób następujący:

  • przy wywołaniu zadania przekażemy parametr będący nazwą miejscowości, dla której będziemy pobierać dane o aktualnej pogodzie
  • po pobraniu dane pogodowe (wybrane) zostaną zapisane w modelu
  • dla uproszczenia pominiemy obsługę błędów
# djcel/settings.py

OPENWEATHER_API_KEY = 'MySeCrEtOpEnWeAtHeR_API_KEY'

Model dla danych pogodowych niech wygląda następująco:

# weather/models.py
from django.db import models

class City(models.Model):
    name = models.CharField(max_length=64, unique=True)
    temperature = models.DecimalField(max_digits=10, decimal_places=2, null=True, blank=True)
    description = models.CharField(max_length=255, null=True, blank=True)
    icon = models.CharField(max_length=64, null=True, blank=True)
    country = models.CharField(max_length=2, null=True, blank=True)

    def __str__(self):
        return self.name

Nasze zadanie umieśćmy w pliku tasks.py w katalogu aplikacji.

# weather/tasks.py
import requests

from django.conf import settings
from celery import task

@task
def get_weather_data(city):
    api_key = settings.OPENWEATHER_API_KEY
    url = "http://api.openweathermap.org/data/2.5/weather?units=metric&q={}&APPID={}".format(city, api_key)
    data = requests.get(url).json()

    city, created = City.objects.update_or_create(name=city, defaults={
        'temperature': data['main']['temp'],
        'description': data['weather'][0]['description'],
        'icon': data['weather'][0]['icon'],
        'country': data['sys']['country'],
    })

Uwaga! Umieszczanie kluczy API w plikach konfiguracyjnych jest złą praktyką! Takie klucze należy umieścić dajmy na to w zmiennych środowiskowych serwera, skąd aplikacja Django mogłaby je pobierać do settings w trakcie uruchomienia (dla przykładu przy wykorzystaniu dodatku django_environ).

Teraz pozostało nam stworzenie odpowiedniego widoku, który będzie zlecał wykonanie zadania aktualizacji danych pogodowych dla poszczególnych miejscowości zapisanych w City.

# weather/views.py
from django.contrib import messages
from django.http import HttpResponseRedirect

from .tasks import get_weather_data

class WeatherUpdateView(generic.View):
    def get(self, request, *args, **kwargs):
        cities = City.objects.all()
        for city in cities:
            get_weather_data.delay(city.name)

        messages.add_message(request, messages.INFO,
                            'Weather update task started.')
        return HttpResponseRedirect(reverse('home'))

i dla formalności urls.py:

# djcel/urls.py
from app import views

urlpatterns = [
    path('', views.HomeView.as_view(), name='home'),
    path('update', views.WeatherUpdateView.as_view(), name='update'),
]

Przykładowy kod mamy gotowy, pora więc lokalnie uruchomić całość. Mamy już zainstalowany i uruchomiony redis, teraz w następnej kolejności uruchamiamy celery (np. w oknie konsoli).

$ celery -A weather worker -l info
 -------------- celery@dellik v4.2.1 (windowlicker)
---- **** -----
--- * ***  * -- Linux-4.19.13-1-MANJARO-x86_64-with-arch-Manjaro-Linux 2019-01-16 08:55:26
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         djcel:0x7fe15cd91b70
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . weather.tasks.get_weather_data

[2019-01-16 08:55:27,178: INFO/MainProcess] Connected to redis://localhost:6379//
[2019-01-16 08:55:27,191: INFO/MainProcess] mingle: searching for neighbors
[2019-01-16 08:55:28,216: INFO/MainProcess] mingle: all alone
[2019-01-16 08:55:28,272: INFO/MainProcess] celery@dellik ready.

W drugiej konsoli uruchamiamy projekt django (jeśli tworzymy własny projekt od zera, to pamiętajmy o migracjach oraz o wpisaniu nazw miejscowości do tabeli City):

$ python manage.py runserver

W oknie przeglądarki wpisujemy url: http://127.0.0.1:8000/update

W moim przypadku miałem wpisane trzy miejscowości w tabeli City, i dla każdej z nich zostało zlecone zadanie pobrania danych pogodowych z API openweather

[2019-01-16 09:17:50,435: INFO/MainProcess] Received task: weather.tasks.get_weather_data[71b215d8-1d9f-49c7-961a-d37fd8525db2]
[2019-01-16 09:17:50,437: INFO/MainProcess] Received task: weather.tasks.get_weather_data[ab7e6472-467a-480f-bd18-cfe17c1a667e]
[2019-01-16 09:17:50,441: INFO/MainProcess] Received task: weather.tasks.get_weather_data[6e3307ff-05d3-478c-a67a-624ced2d68a4]
[2019-01-16 09:17:50,939: INFO/ForkPoolWorker-2] Task weather.tasks.get_weather_data[ab7e6472-467a-480f-bd18-cfe17c1a667e] succeeded in 0.4999919530000625s: None
[2019-01-16 09:17:51,044: INFO/ForkPoolWorker-4] Task weather.tasks.get_weather_data[6e3307ff-05d3-478c-a67a-624ced2d68a4] succeeded in 0.6011501890002364s: None
[2019-01-16 09:17:52,344: INFO/ForkPoolWorker-8] Task weather.tasks.get_weather_data[71b215d8-1d9f-49c7-961a-d37fd8525db2] succeeded in 1.9071739209998668s: None

Z racji tego, że w niniejszym przykładzie wykorzystuję bazę danych SQLite, to mogą pojawić się błędy działania związane z równoczesnym dostępem do danych przez procesy workera (django.db.utils.OperationalError: database is locked), gdyż baza ta nie umożliwia standardowo wielodostępu do danych. Rozwiązaniem jest zmiana bazy danych na np. PostgreSQL.

Zadania okresowe

Istnieją czasem sytuacje, w których aplikacja webowa musi wykonywać jakieś zadania okresowo (np. raz dziennie, co godzinę itp) lub o określonej porze dnia. Celery daję nam możliwość ustawiania harmonogramów wykonywania zadań w stosunkowo łatwy sposób. Do tego celu służy celery beat, który okresowo “przegląda” harmonogramy zadań i w odpowiednich momentach zleca je do wykonania workerom.

Aby to zobrazować, rozbudujemy nasz przykład o dodatkowe zadania “nadrzędne”, które będzie uruchamiane co 15 minut, i będzie zlecało pobranie danych o pogodzie dla wszystkich zapisanych miejscowości. W tym celu zaktualizujemy kod w pliku weather/tasks.py.

# weather/tasks.py
from .weather import fetch_data
from celery import task
from celery.task.schedules import crontab
from celery.decorators import periodic_task

from .models import City

@task
def get_weather_data(city):
    api_key = settings.OPENWEATHER_API_KEY
    url = "http://api.openweathermap.org/data/2.5/weather?units=metric&q={}&APPID={}".format(city, api_key)
    data = requests.get(url).json()

    city, created = City.objects.update_or_create(name=city, defaults={
        'temperature': data['main']['temp'],
        'description': data['weather'][0]['description'],
        'icon': data['weather'][0]['icon'],
        'country': data['sys']['country'],
    })

@periodic_task(run_every=(crontab(minute='*/15')))
def get_all_weather_data():
    for city in City.objects.all():
        get_weather_data.delay(city.name)

Wykorzystamy dekorator periodic_task aby stworzyć harmonogram wywołań zadania. W tym przypadku zadanie będzie zlecane do wykonania co 15 minut (o pełnej godzinie, piętnaście minut po pełnej godzinie, w połowie godziny i piętnaście minut przed pełną godziną). Teraz oprócz celery worker musimy uruchomić dodatkowo proces celery beat.

$ celery -A weather beat -l info
celery beat v4.2.1 (windowlicker) is starting.
__    -    ... __   -        _
LocalTime -> 2019-01-16 10:53:45
Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2019-01-16 10:53:45,505: INFO/MainProcess] beat: Starting...

O odpowiedniej porze (wynikającej z ustawionego harmonogramu) nastąpi uruchomienie zadania get_all_weather_data(). Zadanie to zleci wykonanie kolejnych zadań pobrania danych pogodowych dla poszczególnych miejscowości. Tak to wygląda na konsolach:

# beat
[2019-01-16 11:00:00,128: INFO/MainProcess] Scheduler: Sending due task weather.tasks.get_all_weather_data (weather.tasks.get_all_weather_data)

# worker
[2019-01-16 11:00:00,152: INFO/MainProcess] Received task: weather.tasks.get_all_weather_data[6a3f1380-4a16-4e0d-bd33-581b78dc0385]
[2019-01-16 11:00:00,193: INFO/MainProcess] Received task: weather.tasks.get_weather_data[efce61cc-ee1f-476e-868c-ba2dff169d4d]
[2019-01-16 11:00:00,194: INFO/ForkPoolWorker-8] Task weather.tasks.get_all_weather_data[6a3f1380-4a16-4e0d-bd33-581b78dc0385] succeeded in 0.03974403699976392s: None
[2019-01-16 11:00:00,195: INFO/MainProcess] Received task: weather.tasks.get_weather_data[73d3fd11-82a1-42ce-aed2-193f8dfd3f2c]
[2019-01-16 11:00:00,197: INFO/MainProcess] Received task: weather.tasks.get_weather_data[5e8c2de2-4dd2-4188-a31e-4951961e6c93]
[2019-01-16 11:00:00,559: INFO/ForkPoolWorker-4] Task weather.tasks.get_weather_data[73d3fd11-82a1-42ce-aed2-193f8dfd3f2c] succeeded in 0.47104553000006316s: None
[2019-01-16 11:00:00,642: INFO/ForkPoolWorker-2] Task weather.tasks.get_weather_data[efce61cc-ee1f-476e-868c-ba2dff169d4d] succeeded in 0.44790954800009786s: None
[2019-01-16 11:00:00,752: INFO/ForkPoolWorker-5] Task weather.tasks.get_weather_data[5e8c2de2-4dd2-4188-a31e-4951961e6c93] succeeded in 0.5539967090007849s: None

Uruchomienie Celery na serwerze

Skorzystamy z demona supervisor, który umożliwia nam zarządzanie i kontrolę nad uruchamianiem procesów celery na serwerze. Supervisor instalujemy z paczek dystrybucji:

# ubuntu
$ sudo apt-get install supervisor

Następnie tworzymy pliki konfiguracyjne dla celery workera i dla celery beata.

# /etc/supervisor/conf.d/weather-celery-worker.conf
[program:weather-celery-worker]
command=/home/maciej/weather/bin/celery worker -A weather -l INFO
directory=/home/maciej/weather
user=maciej
numprocs=1
stdout_logfile=/home/maciej/weather/logs/celery/weather_celery_worker.log
stderr_logfile=/home/maciej/weather/logs/celery/weather_celery_worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs = 600
killasgroup=true
priority=998
# /etc/supervisor/conf.d/weather-celery-beat.conf
[program:weather-celery-beat]
command=/home/maciej/weather/bin/celery beat -A weather -l INFO
directory=/home/maciej/weather
user=maciej
numprocs=1
stdout_logfile=/home/maciej/weather/logs/celery/weather_celery_beat.log
stderr_logfile=/home/maciej/weather/logs/celery/weather_celery_beat.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs = 600
killasgroup=true
priority=998

/home/maciej/weather to ścieżka do środowiska wirtualnego i aplikacji pogodowej django. Teraz wymuszamy na supervisor odczyt konfiguracji i zastosowanie zmian:

$ sudo supervisorctl reread
$ sudo supervisorctl update

Możemy używać następujących komend do zatrzymywania/uruchamiania/sprawdzenia stanu poszczególnych programów:

$ sudo supervisorctl stop weather-celery-worker
$ sudo supervisorctl start weather-celery-worker
$ sudo supervisorctl status weather-celery-worker

Konteneryzujemy projekt za pomocą Dockera

Konteneryzacja z wykorzystaniem Dockera staje się coraz popularniejsza dlatego przedstawię jeszcze drugi sposób, w jaki można uruchamiać cały projekt przy jego pomocy. Dla osób niezaznajomionych z Dockerem polecam ten wpis w celu zapoznania się z tematem konteneryzacji projektu Django.

Zaczniemy od stworzenia pliku z zależnościami dla projektu pogodowego:

# requirements.txt
django>="2.1"
celery==4.2.1
django-redis==4.10.0
redis==3.0.1
requests==2.21.0

Następnie stworzymy przepis (Dockerfile) na zbudowanie kontenera aplikacji:

# Dockerfile
FROM python:3.7-alpine

RUN mkdir -p /app
ADD requirements.txt /
RUN pip install -r /requirements.txt

WORKDIR /app
ENV PYTHONUNBUFFERED 1

I na zakończenie skorzystamy z docker-compose aby stworzyć sobie zestaw kontenerów:

version: '3'
services:
  django: &django
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - .:/app
    ports:
      - "8000:8000"
    env_file: .env
    depends_on:
      - redis
    command: python manage.py runserver 0.0.0.0:8000

  redis:
    image: redis:latest

  celeryworker:
    <<: *django
    ports: []
    command: celery -A weather worker -l INFO

  celerybeat:
    <<: *django
    ports: []
    command: celery -A weather beat -l INFO

Mając wszystko gotowe, uruchamiamy projekt poprzez:

$ docker-compose up

W pliku konfiguracyjnym djcel/settings.py powinniśmy w przypadku dockera dokonać zmiany adresu brokera:

# djcel/settings.py
...
CELERY_BROKER_URL = 'redis://redis:6379'

Przedstawione tutaj rozwiązanie doskonale nadaje się jako przykład do wykorzystania przy deweloperce projektu. Kiedyś w przyszłości mam nadzieję napisać Wam jak projekty Django “wrzucać na produkcje” na serwery VPS, Heroku, Kubernetes.

Dla osób zainteresowanych tradycyjnie udostępniam kod projektu na GitLabie, na którym bazowałem tworząc niniejszy wpis.

Zostaw komentarz