안녕하세요.
Rabbitmq 와 Celery 를 사용해서 프로젝트를 하나 만들어 보고 싶었고,
ChatGPT 의 힘을 빌려 주제를 추천 받았고, 아래의 배송 서비스를 구현 해 봤습니다.
https://github.com/2044smile/shipment
GitHub - 2044smile/shipment: python, django, rabbitmq, celery, drf, drf-simplejwt, swagger
python, django, rabbitmq, celery, drf, drf-simplejwt, swagger - 2044smile/shipment
github.com
간단히 프로젝트에 대한 소개를 드리자면,
카카오 로그인, djangorestframework-simplejwt 를 이용하여 토큰을 관리했고, 주문 서비스와 아이템(상품) 서비스를 구현하였습니다. rabbitmq 와 celery 를 사용하기 위해 docker-compose 를 사용 했습니다.
Django와 Celery 를 사용하여 배송 상태를 관리하는 방법을 살펴 보겠습니다.
Django 와 Celery, Rabbitmq 를 활용한 비동기 작업 처리: 배송 상태 업데이트
웹 애플리케이션에서 사용자 경험을 향상시키기 위해 동기 작업을 처리하는 것은 매우 중요합니다.
Django 와 Celery 를 사용하면 작업을 비동기적으로 처리하여 서버의 부하를 줄이고, 응답 속도를 개선할 수 있습니다.
프로젝트 설정
Celery 설치
우선 Django 프로젝트에 Celery 를 설치합니다.
Celery 는 분산 작업 큐 로서, 작업을 비동기적으로 처리할 수있는 강력한 도구 입니다.
pip install celery
settings.py 설정
Celery 와 RabbitMQ를 연동하기 위해 브로커 URL을 설정합니다.
RabbitMQ 말고도 주로 비교되는 브로커는 Redis 가 있습니다.
# cofnig/settings.py
INSTALLED_APPS = [
'celery',
]
CELERY_TIMEZONE = TIME_ZONE
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
# RabbitMQ를 브로커로 사용
CELERY_BROKER_URL = "amqp://guest:guest@rabbitmq:5672//"
`CELERY_TASK_TRACK_STARTED` 와 `CELERY_TASK_TIME_LIMIT` 설정을 통해
작업이 시작되었는지 추적하고, 작업의 최대 시간을 제한할 수 있습니다.
Celery 설정 파일 생성
Django 와 Celery 를 통합하려면 Celery 설정 파일이 필요합니다.
프로젝트의 config 디렉토리에 `celery.py` 파일을 생성합니다.
# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery("config")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # Django 앱의 `tasks.py` 파일에서 Celery 작업을 자동으로 찾아줍니다.
Docker를 활용한 컨테이너화
Docker를 사용하면 Django 애플리케이션을 컨테이너로 패키징하여 일관된 개발 및 배포 환경을 제공할 수 있습니다. 또한, Celery와 RabbitMQ를 컨테이너로 관리함으로써 비동기 작업 처리를 더욱 쉽게 구성할 수 있습니다.
`docker-compose.yaml`, `Dockerfile` 을 설정하여 프로젝트를 컨테이너화 하는 방법을 살펴보겠습니다.
docker-compose.yaml 파일
`docker-compose.yaml` 파일을 설정합니다.
이 파일은 여러 컨테이너를 정의하고, 이들이 서로 어떻게 상호작용하는지 규정합니다.
# docker-compose.yaml
version: '3'
services:
app:
build:
context: .
ports:
- "8000:8000"
volumes: # 옵션을 통해 로컬 파일 시스템을 컨테이너에 마운트하여 코드를 쉽게 업데이트 할 수 있습니다.
- .:/app
command: >
sh -c "python manage.py runserver 0.0.0.0:8000"
depends_on: # RabbitMQ 서비스가 시작된 후에 Django 앱이 시작되도록 합니다.
- rabbitmq
rabbitmq: # 메시지 브로커를 위한 서비스 입니다.
container_name: rabbitmq
image: rabbitmq:3.7-alpine # alpine 이미지를 사용하여 경량화된 RabbitMQ 컨테이너를 실행
environment:
- RABBITMQ_USER=guest
- RABBITMQ_PASSWORD=guest
ports: # 포트를 외부(Django, Celery 에서 접근 가능)에 노출하여 브로커와 UI에 접근할 수 있습니다.
- "5672:5672" # RabbitMQ 기본 포트
- "15672:15672" # RabbitMQ 관리 UI 포트
expose:
- "15672"
celery: # 워커
build:
context: .
volumes:
- .:/app
command: celery -A config worker --loglevel=DEBUG & > celery_log
depends_on:
- rabbitmq
Dockerfile 설정
Django 애플리케이션의 컨테이너 이미지를 빌드하는 데 사용 됩니다.
이 파일은 애플리케이션의 종속성을 설치하고, 필요한 설정을 자동으로 처리합니다.
FROM python:3.10-slim # 경량화
ENV PYTHONDONTWRITEBYTECODE 1 # .pyc 파일을 생성하지 않도록
ENV PYTHONUNBUFFERED 1 # Python 출력이 즉시 표시되도록 설정
WORKDIR /app
COPY requirements.txt /app/
RUN pip install -r requirements.txt
COPY . /app/
EXPOSE 8000 # 포트 8000을 외부에 노출
CMD ["python3", "manage.py", "runserver", "0.0.0.0:8000"]
docker-compose up

프로젝트 초기화 설정
Celery 가 Django 프로젝트와 함꼐 시작되도록 `__init__.py` 파일을 수정합니다.
# config/__init__.py
from .celery import app as celery_app
__all__ = ['celery_app']
이제 Celery가 Django 프로젝트와 함께 실행될 준비가 되었습니다.
비동기 작업 정의
배송 상태를 업데이트하는 비동기 작업을 정의합니다.
`tasks.py` 파일을 생성하고, Celery 작업을 작성합니다.
# app/tasks.py
from config.celery import app
from .models import Delivery
@app.task
def process_delivery_task(delivery_id):
instance = Delivery.objects.get(id=delivery_id)
instance.status = "2" # 배송 완료 상태로 변경
instance.save()
비동기 작업 호출
`views.py` 에서 사용자가 배송을 출발할 때 비동기 작업을 호출하도록 설정합니다.
# app/views.py
class DeliveryViewSet(viewsets.ModelViewSet):
queryset = Delivery.objects.all()
serializer_class = DeliverySerializer
permission_classes = [permissions.IsAuthenticated]
authentication_classes = [JWTAuthentication]
@action(detail=True, methods=['GET'], name='departure')
def departure(self, request, pk=None):
instance = self.get_object()
if instance.status == '2':
return JsonResponse({'message': f'{instance.receiver.kakao_id} 고객님의 상품은 이미 배송이 완료 되었습니다.'})
instance.status = '1'
instance.save()
# celery 호출 / 시간은 60초로 설정
process_delivery_task.s(delivery_id=instance.id).apply_async(countdown=60)
return JsonResponse({'message': f'{instance.receiver.kakao_id} 고객님의 상품 {instance.order.item.name} 배송이 출발하였습니다. the delivery service will arrive in 60 seconds'})
결과 확인
자, 그럼 Celery 가 잘 동작 했는지 확인해볼까요?

비동기 작업 정의에서 '2' 로 status 값을 변경 했었죠?
정상적으로 값이 출력되는 것을 확인할 수 있습니다.

celery 는 어떤 작업을 수행할까요?

보시면 TaskPool 이 생성되고, 제가 설정한 60초가 지나면 succeeded 되는 걸 확인할 수 있습니다.
Shipment 프로젝트에서 rabbitmq와 celery 를 사용해 봤습니다.
질문이나 오타나 더 좋은 방법이 있으면 댓글로 알려주시면 감사합니다!
'Web > Django' 카테고리의 다른 글
| Django rest_framework filters 삽질 일기 (0) | 2023.08.19 |
|---|---|
| Django ORM replace 'for' vs 'bulk_update' (2) | 2022.09.30 |
| Django SSH tunneling(AWS SSH) , AWS RDS, inspectDB (0) | 2022.07.12 |
| [Django] GraphQL 기초 한 방 정리 (0) | 2020.12.12 |
| [Django Rest Framework] ModelViewset 동작에 대해 (queryset, get_object) (1) | 2020.08.20 |
댓글