Fila para tarefas assíncronas com Django, Celery e AWS SQS

asynchronous task queue

Ao lidar com funcionalidades com maior tempo de execução e que podem ter um grande impacto no desempenho de uma aplicação web, você pode enfrentar a necessidade de executá-las de forma assíncrona (agendadas ou não).

Essas tarefas assíncronas executadas em segundo plano podem não apenas melhorar drasticamente a escalabilidade da aplicação por mover as operações de alto consumo para o segundo plano, mas também melhorar a usabilidade da funcionalidade em si. Depois de espalhado em diferentes componentes, cada um com sua própria responsabilidade, seu código irá parecer mais limpo e isolado, consequentemente melhorando a sua manutenção.

Este artigo apresenta alguns tópicos relacionados a uma arquitetura pré-construída usando Django, Celery, Docker e AWS SQS. A base de código está disponível no Github e você pode facilmente seguir as etapas do README para ter a aplicação funcionando sem esforço.

A seção a seguir traz uma breve visão geral dos componentes usados para construir a arquitetura.

 

Descrição dos componentes

Agente de mensagens

Message broker usage

O que é um agente de mensagens? Nossa sempre amigável Wikipedia diz que: ele intermedia a comunicação entre os aplicativos, minimizando a consciência mútua que os aplicativos devem ter uns dos outros para poder trocar mensagens, implementando efetivamente o desacoplamento. (2020)

Ou seja, é uma camada intermediária na qual as aplicações podem se comunicar – ler e/ou escrever mensagens – e, com isso, possibilita a construção de duas aplicações desacopladas que não dependem uma da outra.

 

Amazon SQS

Amazon SQS queue messaging flow

O que é AWS SQS? “O Amazon Simple Queue Service (SQS) é um serviço de enfileiramento de mensagens totalmente gerenciado que permite desacoplar e dimensionar microsserviços, sistemas distribuídos e aplicativos sem servidor. […] Usando o SQS, você pode enviar, armazenar e receber mensagens entre componentes de software em qualquer volume, sem perder mensagens ou exigir que outros serviços estejam disponíveis. ” (Amazon, 2020)

Existem algumas alternativas para SQS, como Kafka, Redis ou RabbitMQ e todas elas podem ser facilmente configuradas na Amazon usando ElasticCache ou AmazonMQ. Além disso, você tem a opção de implementar seu próprio agente de mensagens em instâncias do EC2, o que pode ser mais barato dependendo do número de mensagens. Porém você também deve considerar o tempo e o esforço necessários para configurá-lo no e o esforço extra caso decida usá-lo com o Auto Scaling habilitado.

O que vejo como o principal benefício de usar SQS é que ele é totalmente gerenciado pela AWS e você paga sob demanda. Se os requisitos de sua aplicação ou funcionalidade estão de acordo com o uso de SQS, é muito simples de configurá-lo e usá-lo.

 

Celery

Celery library logo

O que é o Celery? “Celery é um gerenciador de fila de tarefas/trabalhos assíncronas baseado na passagem de mensagens distribuídas. Ele está focado na operação em tempo real, mas também oferece suporte para agendamentos. […] As tarefas podem ser executadas de forma assíncrona (em segundo plano) ou de forma síncrona (esperar até que estejam prontas). ” (Celery, 2020)

Essencialmente, o Celery é usado para coordenar e executar tarefas distribuídas do Python. Ele permite a possibilidade de mover uma execução de código específico para fora do ciclo de requisição-resposta HTTP. Dessa forma, seu servidor pode responder o mais rápido possível a uma requisição específica, pois gera uma tarefa assíncrona para um outro processo para executar a parte específica de código, o que passa a melhorar o tempo de resposta do servidor. Além disso, com o Celery você tem a opção de executar tarefas em segundo plano previamente agendadas.

Juntando tudo isso

Para colocar todos na mesma página: AWS SQS é o agente de mensagens escolhido aqui e Celery é o componente responsável por orquestrar – consumir, ler e gravar – a fila de mensagens. Como o Celery é uma biblioteca, ele precisa ser configurado em cima do Django.

Observação: o código-fonte usado nesta postagem do blog está disponível no GitHub. Ele assume as seguintes pastas/estrutura de aplicativo:

 
.
└── src
    ├── app
    │   ├── __init__.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── manage.py
    […]

O Docker não é necessário para o sistema funcionar, mas torna muito mais fácil reproduzir a arquitetura final do sistema localmente e também serve como um guia ao configurar a arquitetura do sistema em um ambiente de serviço de computação em nuvem.

No arquivo docker-compose abaixo, existem 5 serviços configurados:

 
version: "3"
services:
  web:
    build: .
    volumes:
      - .:/usr/src/app
    ports:
      - 8000:8000
    environment:
      ENV: development
    depends_on:
      - db
      - sqs

  worker:
    build: .
    volumes:
      - .:/usr/src/app
    command: bash -c "cd src/ && celery -A app worker --loglevel=debug"
    depends_on:
      - web
      - sqs

  beat:
    build: .
    volumes:
      - .:/usr/src/app
    command: bash -c "cd src/ && celery -A app beat --loglevel=debug" 
    depends_on:
      - web 
      - sqs 

  db:
    image: postgres:12
    ports:
      - 5432:5432
    volumes:
      - ./docker/db/pgdata:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: app

  sqs:
    image: roribio16/alpine-sqs
    ports:
      - 9324:9324
      - 9325:9325
    volumes:
      - ./config/elasticmq.conf:/opt/config/elasticmq.conf

Aqui está o resumo do arquivo docker-compose:

  • web: é o contêiner de serviço da web.
  • worker: é um Celery worker que gera um processo supervisor/pai que não processa nenhuma tarefa. Em vez disso, ele gera processos filhos para executar as tarefas reais disponíveis.
  • beat: é um agendador de Celery que gera periodicamente tarefas que são executadas pelos workers disponíveis.
  • db: contêiner de banco de dados postgres.
  • sqs: é uma implementação Java em contêiner do Amazon Queue Service que usaremos para imitar o comportamento do AWS SQS. A porta 9324 está disponível para usar o serviço de fila e a 9325 está disponível para acessar a interface da web.

Basicamente, o serviço de contêiner worker é responsável por pesquisar e executar as mensagens disponíveis nas filas e o serviço de contêiner beat é responsável por gerar tarefas periódicas a serem executadas pelos workers. Ao trabalhar com tarefas periódicas, existem algumas configurações possíveis. Você pode verificar alguns exemplos nesta página.

A seção de código a seguir é o arquivo config/elasticmq.conf e é usado apenas para a execução local. Ele configura o endereço node para o contêiner sqs. Como o Celery por padrão cria e usa uma fila chamada “celery”, não precisamos criar uma nova fila, a menos que seja necessário.


include classpath("application.conf")

node-address {
   protocol = http
   host = "*"
   port = 9324
   context-path = ""
}

rest-sqs {
   enabled = true
   bind-port = 9324
   bind-hostname = "0.0.0.0"
   // Possible values: relaxed, strict
   sqs-limits = strict
}

Este é um exemplo do que deve ser o seu arquivo app/celery.py. Você pode encontrar facilmente o significado de cada chave de configuração no site do Celery. O principal é o BROKER_URL, que é o endereço do agente ao qual o Celery se conectará. O valor da variável BROKER_URL é codificado apenas para fins de ilustração. Os valores de usuário e senha são valores fictícios, pois não são configurados no serviço sqs do contêiner docker.


import os

from celery import Celery
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings")

app = Celery("app")

CELERY_CONFIG = {
    "CELERY_TASK_SERIALIZER": "json",
    "CELERY_ACCEPT_CONTENT": ["json"],
    "CELERY_RESULT_SERIALIZER": "json",
    "CELERY_RESULT_BACKEND": None,
    "CELERY_TIMEZONE": "America/Sao_Paulo",
    "CELERY_ENABLE_UTC": True,
    "CELERY_ENABLE_REMOTE_CONTROL": False,
}

BROKER_URL = "sqs://user:password@sqs:9324/",

CELERY_CONFIG.update(
    **{
        "BROKER_URL": BROKER_URL,
        "BROKER_TRANSPORT": "sqs",
        "BROKER_TRANSPORT_OPTIONS": {
            "region": "us-west-2",
            "visibility_timeout": 3600,
            "polling_interval": 60,
        },
    }
)

app.conf.update(**CELERY_CONFIG)
app.autodiscover_tasks(packages={"payment.tasks"}))

O código a seguir é uma função de tarefa fictícia. A aplicação já sabe que se trata de um trabalho assíncrono apenas usando o decorador @task importado do Celery. Basicamente, o decorador envolve a função e retorna uma instância da classe de tarefa com alguns métodos implementados.


from celery import task

from payment.models import Payment

@task
def capture_payment(pk):
    payment = Payment.objects.get(pk=pk)
    print(f"Capture payment: {payment}")

A seção a seguir é um exemplo de seu uso.


from payment.models import Payment

payment = Payment.objects.first()
capture_payment.delay(pk=payment.pk)

Ao chamá-lo com o método de delay, uma instância de AsyncResult será gerada e o worker executará a tarefa de mensagem assim que estiver disponível na fila.

Resumo

Neste artigo, eu apresentei a você os principais conceitos por trás da arquitetura proposta e também esclareço o que considero importante saber sobre isso. Com a base de código disponível no GitHub, você pode facilmente ter um aplicativo Docker em contêiner que implementa um sistema de enfileiramento de tarefas usando Celery em cima do Django para fins de desenvolvimento local. Portanto, para implantá-lo, você precisa criar os respectivos serviços na AWS.

Limitações conhecidas

Recentemente, encontramos uma limitação ao usar o AWS SQS: o tempo limite de visibilidade, que é um período de tempo em que o Amazon SQS impede que outros consumidores recebam e processem as mensagens. Em um de nossos projetos, uma tarefa teria a possibilidade de ser agendada para execução em apenas 2 semanas a partir de agora, mas como o tempo máximo de visibilidade de uma mensagem é de 12 horas, não é possível realizá-la usando SQS.

Referências

Wikipedia.com, Message Broker. Available in: <https://en.wikipedia.org/wiki/Message_broker>. Access in: June 24th, 2020.

Docs.aws.amazon.com, What is Amazon Simple Queue Service?. Available in: <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html>. Access in: June 24th, 2020.

Celeryproject, Celery – Distributed Task Queue. Available in: <https://docs.celeryproject.org/en/stable/>. Access in: June 24th, 2020.

Sobre o autor.