Django Celery, Rabbitmq 사용한 비동기 작업 처리
Web/Django

Django Celery, Rabbitmq 사용한 비동기 작업 처리

뉴비뉴 2024. 8. 14.

 안녕하세요.

Rabbitmq 와 Celery 를 사용해서 프로젝트를 하나 만들어 보고 싶었고,

ChatGPT 의 힘을 빌려 주제를 추천 받았고, 아래의 배송 서비스를 구현 해 봤습니다.

 

https://github.com/2044smile/shipment

 

GitHub - 2044smile/shipment: python, django, rabbitmq, celery, drf, drf-simplejwt, swagger

python, django, rabbitmq, celery, drf, drf-simplejwt, swagger - 2044smile/shipment

github.com

 

간단히 프로젝트에 대한 소개를 드리자면,

 

카카오 로그인, djangorestframework-simplejwt 를 이용하여 토큰을 관리했고, 주문 서비스와 아이템(상품) 서비스를 구현하였습니다. rabbitmq 와 celery 를 사용하기 위해 docker-compose 를 사용 했습니다.

 

Django와 Celery 를 사용하여 배송 상태를 관리하는 방법을 살펴 보겠습니다.

 

Django 와 Celery, Rabbitmq 를 활용한 비동기 작업 처리: 배송 상태 업데이트

 웹 애플리케이션에서 사용자 경험을 향상시키기 위해 동기 작업을 처리하는 것은 매우 중요합니다.

Django 와 Celery 를 사용하면 작업을 비동기적으로 처리하여 서버의 부하를 줄이고, 응답 속도를 개선할 수 있습니다.

프로젝트 설정

Celery 설치

 우선 Django 프로젝트에 Celery 를 설치합니다.

Celery 는 분산 작업 큐 로서, 작업을 비동기적으로 처리할 수있는 강력한 도구 입니다. 

pip install celery

settings.py 설정

 Celery 와 RabbitMQ를 연동하기 위해 브로커 URL을 설정합니다.

RabbitMQ 말고도 주로 비교되는 브로커는 Redis 가 있습니다.

# cofnig/settings.py

INSTALLED_APPS = [
    'celery',
]

CELERY_TIMEZONE = TIME_ZONE
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

# RabbitMQ를 브로커로 사용
CELERY_BROKER_URL = "amqp://guest:guest@rabbitmq:5672//"

 

`CELERY_TASK_TRACK_STARTED` 와 `CELERY_TASK_TIME_LIMIT` 설정을 통해

작업이 시작되었는지 추적하고, 작업의 최대 시간을 제한할 수 있습니다.

Celery 설정 파일 생성

 Django 와 Celery 를 통합하려면 Celery 설정 파일이 필요합니다.

프로젝트의 config 디렉토리에 `celery.py` 파일을 생성합니다.

# config/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery("config")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()  # Django 앱의 `tasks.py` 파일에서 Celery 작업을 자동으로 찾아줍니다.

Docker를 활용한 컨테이너화

 Docker를 사용하면 Django 애플리케이션을 컨테이너로 패키징하여 일관된 개발 및 배포 환경을 제공할 수 있습니다. 또한, Celery와 RabbitMQ를 컨테이너로 관리함으로써 비동기 작업 처리를 더욱 쉽게 구성할 수 있습니다.

`docker-compose.yaml`, `Dockerfile` 을 설정하여 프로젝트를 컨테이너화 하는 방법을 살펴보겠습니다.

docker-compose.yaml 파일

 `docker-compose.yaml` 파일을 설정합니다.

이 파일은 여러 컨테이너를 정의하고, 이들이 서로 어떻게 상호작용하는지 규정합니다.

# docker-compose.yaml
version: '3'

services:
  app:
    build:
      context: .
    ports:
      -  "8000:8000"
    volumes: # 옵션을 통해 로컬 파일 시스템을 컨테이너에 마운트하여 코드를 쉽게 업데이트 할 수 있습니다.
      - .:/app
    command: >
      sh -c "python manage.py runserver 0.0.0.0:8000"
    depends_on:  # RabbitMQ 서비스가 시작된 후에 Django 앱이 시작되도록 합니다.
      - rabbitmq
    
  rabbitmq:  # 메시지 브로커를 위한 서비스 입니다.
    container_name: rabbitmq
    image: rabbitmq:3.7-alpine  # alpine 이미지를 사용하여 경량화된 RabbitMQ 컨테이너를 실행
    environment:
      - RABBITMQ_USER=guest
      - RABBITMQ_PASSWORD=guest
    ports:  # 포트를 외부(Django, Celery 에서 접근 가능)에 노출하여 브로커와 UI에 접근할 수 있습니다.
      - "5672:5672" # RabbitMQ 기본 포트
      - "15672:15672" # RabbitMQ 관리 UI 포트
    expose:
      - "15672"

  celery:  # 워커
    build:
      context: .
    volumes:
      - .:/app
    command: celery -A config worker --loglevel=DEBUG & > celery_log
    depends_on:
      - rabbitmq

Dockerfile 설정

 Django 애플리케이션의 컨테이너 이미지를 빌드하는 데 사용 됩니다.

이 파일은 애플리케이션의 종속성을 설치하고, 필요한 설정을 자동으로 처리합니다.

FROM python:3.10-slim  # 경량화

ENV PYTHONDONTWRITEBYTECODE 1  # .pyc 파일을 생성하지 않도록
ENV PYTHONUNBUFFERED 1  # Python 출력이 즉시 표시되도록 설정

WORKDIR /app

COPY requirements.txt /app/
RUN pip install -r requirements.txt

COPY . /app/

EXPOSE 8000  # 포트 8000을 외부에 노출

CMD ["python3", "manage.py", "runserver", "0.0.0.0:8000"]
docker-compose up

프로젝트 초기화 설정

 Celery 가 Django 프로젝트와 함꼐 시작되도록 `__init__.py` 파일을 수정합니다.

# config/__init__.py
from .celery import app as celery_app


__all__ = ['celery_app']

 

이제 Celery가 Django 프로젝트와 함께 실행될 준비가 되었습니다.

비동기 작업 정의

 배송 상태를 업데이트하는 비동기 작업을 정의합니다.

`tasks.py` 파일을 생성하고, Celery 작업을 작성합니다.

# app/tasks.py
from config.celery import app
from .models import Delivery

@app.task
def process_delivery_task(delivery_id):
    instance = Delivery.objects.get(id=delivery_id)
    instance.status = "2"  # 배송 완료 상태로 변경
    instance.save()

비동기 작업 호출

 `views.py` 에서 사용자가 배송을 출발할 때 비동기 작업을 호출하도록 설정합니다.

# app/views.py
class DeliveryViewSet(viewsets.ModelViewSet):
    queryset = Delivery.objects.all()
    serializer_class = DeliverySerializer
    permission_classes = [permissions.IsAuthenticated]
    authentication_classes = [JWTAuthentication]

    @action(detail=True, methods=['GET'], name='departure')
    def departure(self, request, pk=None):
        instance = self.get_object()
        if instance.status == '2':
            return JsonResponse({'message': f'{instance.receiver.kakao_id} 고객님의 상품은 이미 배송이 완료 되었습니다.'})
        instance.status = '1'
        instance.save()

        # celery 호출 / 시간은 60초로 설정
        process_delivery_task.s(delivery_id=instance.id).apply_async(countdown=60)

        return JsonResponse({'message': f'{instance.receiver.kakao_id} 고객님의 상품 {instance.order.item.name} 배송이 출발하였습니다. the delivery service will arrive in 60 seconds'})

결과 확인

 자, 그럼 Celery 가 잘 동작 했는지 확인해볼까요?

 

 

비동기 작업 정의에서 '2' 로 status 값을 변경 했었죠?

정상적으로 값이 출력되는 것을 확인할 수 있습니다.

 

 

celery 는 어떤 작업을 수행할까요?

 

 

보시면 TaskPool 이 생성되고, 제가 설정한 60초가 지나면 succeeded 되는 걸 확인할 수 있습니다.

 

Shipment 프로젝트에서 rabbitmq와 celery 를 사용해 봤습니다.

질문이나 오타나 더 좋은 방법이 있으면 댓글로 알려주시면 감사합니다!

댓글

💲 추천 글