주니어 개발자의 Argo Workflow, Events 사용기 - 2
Devops

주니어 개발자의 Argo Workflow, Events 사용기 - 2

뉴비뉴 2021. 2. 7.

2021/01/09 - [study/Devops] - Argo Workflow 란 무엇인가? - 1

 

Argo Workflow 란 무엇인가? - 1

목차 들어가기 전에 Argo Workflow란 무엇인가? Kubernetes란 Installation minikube kubectl Argo Workflow 실습 hello-world Dag & Step 들어가기 전에  Python&Django 주니어 개발자인 글쓴이가 구축해본 인..

newbiecs.tistory.com

지난번 포스팅에서는 간단하게 Argo Workflow에 대해 알아보았습니다.

Argo Workflow를 처음 접하시는 분들은 위 글을 참고해주시면 감사하겠습니다.

이 글에서는 직접 Argo Workflow를 구현하면서 사용했던 것들을 정리해보는 글을 작성해보도록 하겠습니다.

목차

  • Argo Events란
    • Architecture
      • EventSource
      • Sensor
      • EventBus
  • 사용한 것들
    • Slack Notifications
    • Artifacts(S3)
    • 환경변수

Argo Events란

Argo Events란 Kubernetes를 위한 이벤트 기반 워크플로우 자동화 프레임워크입니다.

Kubernetes 개체, Argo Workflow, 서버리스 워크로드 등을 트리거하는 역할을 수행합니다.

공식문서에서 제공하고 있는 Trigger 가능한 목록들은 아래와 같습니다.

  • Argo Workflow
  • Standard K8s Objects
  • HTTP Requests / Serverless Workloads (OpenFaas, Kubeless, KNative etc.)
  • AWS Lambda
  • NATS Messages
  • Kafka Messages
  • Slack Notifications
  • Argo Rollouts
  • Custom Trigger / Build Your Own Trigger
  • Apache OpenWhisk

Argo Events를 사용하게 된 이유는 외부에 A라는 업체가 Argo Workflow를 트리거 하기 위함이였습니다.

Webhook을 사용하여 트리거 할 수 있는 URL을 열어두고, A라는 업체에게 URL을 제공해주어 직접 트리거 할 수 있도록 구현하였습니다.

Architecture

Argo Events Architecture

Event Source

AWS SNS, S3, SQS, Webhooks 등과 같은 외부 소스의 이벤트를 소비하는데 필요한 구성을 정의합니다.

Webhooks으로 발생한 이벤트를 cloudevents(?)로 변환하고 Event Bus로 전송합니다.

외부 소스의 이벤트를 받아야 하기 때문에 Webhooks의 경우 포트와 endpoint를 설정 할 수 있습니다.

예를 들어 www.example.com:12000/omega endpoint로 Webhooks 이벤트가 온다면 어떠한 동작을 실행시켜라 라는 동작에서 이벤트를 받는 역할을 합니다.

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: webhook
  namespace: {{ .Release.Namespace }}
spec:
  template:
    serviceAccountName: argo-events-sa
    {{- with .Values.nodeSelector }}
    nodeSelector:
      {{- toYaml . | nindent 6 }}
    {{- end }}
  service:
    ports:
      - port: 12000
        targetPort: 12000
  webhook:  # EventSource에 Webhooks 설정
    A:
      port: "12000"
      endpoint: "/omega"
      method: POST
    B:
      port: "12000"
      endpoint: "/bitamin"
      method: POST

Sensor

센서는 이벤트 종속성(입력) 및 트리거(출력)의 집합을 정의합니다.

이벤트 버스의 이벤트를 수신하고 이벤트 종속성 관리자 역할을 하여 트리거를 확인하고 실행합니다.

이 글에서 Sensor는 Argo Workflow를 트리거하고 있습니다.

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-example
  namespace: {{ .Release.Namespace }}
spec:
  template:
  dependencies:
    - name: sensor-argo-workflow
      eventSourceName: webhook  # 위에서 예시로 든 EventSource name
      eventName: example  # EventSource spec.webhook.<key>
  triggers:
    - template:
        name: webhook-workflow-trigger
        argoWorkflow:  # Sensor로 Argo Workflow를 사용
          group: argoproj.io
          version: v1alpha1
          resource: workflows
          operation: submit  # Argo Workflow Submit(=trigger)
          source:
            resource:

Event Bus

Event Source와 Sensor를 연결하는 Argo Events의 전송 계층 역할을 합니다.

Event Source에서 이벤트가 발생하면 Sensor에서는 위에서 말한 트리거 할 수 있는 목록 중에 한 개를 설정하여 실행시킵니다.

이 글에서는 Event Source Webhooks를 사용하고 있습니다.

Slack Notification

주로 Argo Workflow로 배치성 작업들을 Workflow로 만들어두었기 때문에 시간이 오래걸리는 것들이 많았습니다.

그러다보니 틈틈이 워크플로우가 제대로 동작되었는지 확인해야 했었습니다. 이러한 점을 개선하고자 Workflow가 끝나고나서 Slack 으로 알림을 받을 수 있도록 on-exit을 사용하여 Slack Notification을 구현하였습니다.

공식문서에서 제공해주는 Slack Notification 예제(링크)가 있지만 이것은 저희가 원하는 방향하고 달라 아래와 같이 구현하였습니다.

# common.yaml

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: common-base-template
  namespace: {{ .Release.Namespace }}
spec:
  templates:
  - name: slack-noti
  container:
    image: technosophos/slack-notify
    command:
    env:
    - name: SLACK_WEBHOOK
      value: "https://hooks.slack.com/services/TXXXXXX/BXXXXXXX/AXXXXXXXXX"
    - name: SLACK_TITLE
      value: "{{`[{{workflow.status}}][{{workflow.creationTimestamp}}]`}} {{`{{workflow.namespace}}`}}-{{`{{workflow.name}}`}}"
    - name: SLACK_MESSAGE
      value: "```{{`{{workflow.failures}}`}}```"
    - name: SLACK_COLOR
      value: "#FF0000"

 

kind: WorkflowTemplate는 어디서든 가져와 사용할 수 있는 함수형 템플릿이라고 생각하시면 됩니다.

common.yaml 이라는 곳에 slack-noti를 만들어두고 필요한 Workflow에서 사용할 수 있습니다.

하루에 한번 오전 9시에 워크플로우가 동작하는 cron workflow에 slack-noti를 붙여 성공/실패 여부에 따라 slack 메시지를 받을 수 있도록 slack-noti Workflow를 붙여주도록 하겠습니다.

# cron_workflow.yaml

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: cron-workflow-example
spec:
  schedule: "00 09 * * *"  # 매일 오전 9시에 동작하도록 schedule 설정
  concurrencyPolicy: "Forbid"  # 동시성 정책 설정
  startingDeadlineSeconds: 0
  timezone: Asia/Seoul
  suspend: false
  workflowSpec:
    entrypoint: start-cron-workflow-example
    onExit: call-slack-noti  # onExit 워크플로우가 끝나면 실행할 Template을 지정
    templates:
    - name: start-cron-workflow-example
      steps:
      - - name: first
          templateRef:  # WorkflowTemplate으로 생성한 템플릿을 가져올 때 사용
            name: the-first-snow-falles
            template: start-workflow
    - name: call-slack-noti
      templateRef:  # WorkflowTemplate으로 생성한 템플릿을 가져올 때 사용
        name: common-base-template  # common.yaml 에서 만든 템플릿을 가져온다.
        template: slack-noti  # common.yaml 안에 있는 slack-noti를 가져온다.
      when: "{{`{{workflow.status}}`}} == Failed"  # workflow.status cron workflow의 상태가 실패 시에 실행시킨다.

예시에서 추가로 설명을 덧붙이자면 {{workflow.<?>}} 은 onExit으로 설정되었을 때만 사용이 가능합니다.

"{{`{{workflow.status}}`}}" 와 같이 Mustache 태그가 두 번 붙는 이유는 Helm을 사용하고 있기 때문입니다.

Helm deploy를 하고 변경 된 Argo를 사용하게 될텐데 Helm Deploy하는 과정에서 예시와 같이 Mustache 태그를 감싸주지 않으면 Helm 에서 잘못 인식하여 문법에러를 발생시키킵니다.

Artifacts(S3)

Argo Workflow에서 A 워크플로우가 Complated 된 다음 /data 경로에 생성되는 파일을 B Workflow에 /data 경로에 마운트하여 사용하고 싶었고, Argo Workflow Artifacts를 사용하여 구현하였습니다. 공식문서(링크)를 확인해보시면 AWS key를 설정하여 사용하였지만 이 글에서는 Cluster에 IAM Role과 Kubernetes Service Account를 매핑하여 사용하였기 때문에 AWS 관련 SDK인 boto3에서는 자동으로 인스턴스 안에 설정되어있는 IAM Role을 찾아 S3 권한을 갖게 되기 때문에 별도로 AWS Key 설정은 해주지 않았습니다. 이와 관련한 정보는 boto3가 자격 증명 정보를 얻어내는 구조를 설명해준 블로그에서 확인해보실 수 있습니다.

planbs님 블로그

실제로 사용한 예제를 확인해보도록 하겠습니다.

A-workflow 템플릿에서는 /data 경로에 생성 된 파일들을 artifact가 압축하여 S3에 업로드해주고,

B-workflow 템플릿에서는 S3에 업로드 되어있는 압축 파일을 풀어 /data 경로에 파일들을 생성해주는 예시입니다.

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: artifact-test
spec:
  templates:
    - name: A-workflow
      outputs:  # 워크플로우(컨테이너) 안에 /data 경로에 생성 된 파일을 아래 S3 경로에 압축하여 업로드 합니다.
        artifacts:
        - name: create-vector
          path: /data  # 워크플로우(컨테이너) 안에 경로
          s3:
            endpoint: s3.amazonaws.com
            region: ap-northeast-2
            bucket: starbucks  # bucket name
            key: americano/tall/{{ htmlDateInZone now "Asia/Seoul" }}.tar.gz  # 경로/파일명
      container:
        image: A-workflow:latest
        command: ["python", "-m", "app.create_vector"]

    - name: B-workflow
      inputs:  # 워크플로우(컨테이너) 안에 /data 경로에 S3 경로에 압축파일을 다운로드하여 압축을 풉니다.
        artifacts:
        - name: load-vector
          path: /data
          s3:
            endpoint: s3.amazonaws.com
            region: ap-northeast-2
            bucket: starbucks  # bucket name
            key: americano/tall/{{ htmlDateInZone now "Asia/Seoul" }}.tar.gz  # 경로/파일명
      container:
        image: B-workflow:latest
        command: ["python", "-m", "app.load_vector"]

예시에서 보이는 {{ htmlDateInZone now "Asia/Seoul" }}은 Helm 에서 제공해주는 내장함수(?)와 같은 날짜 함수입니다.

환경변수

Argo Workflow는 Workflow 컨테이너를 띄워 작업을 수행하기 때문에 Workflow 별로 환경변수가 따로 관리하거나 여러 개의 환경변수를 다 같이 사용해야되는 워크플로우도 있습니다. 환경변수를 효율적으로 관리하고 사용할 수 있는 방법에 대해 알아보겠습니다.

values.yaml 활용

Helm에서 제공하는 values.yaml 파일을 활용하여 환경변수를 관리하는 방법입니다.

values.yaml은 key:value 변수들을 생성하여 Helm template 내에서 호출하여 사용할 수 있도록 되어있습니다.

그리고, Helm은 프로그래밍 언어에서 사용하는 반복문을 지원합니다. 이것들을 활용하여 워크플로우에 환경변수를 생성해주도록 하겠습니다.

# values.yaml

env:
  SERVER_ADDRESS: "localhost"
  S3_IMAGE_BUCKET_NAME: "starbucks"
  SIZE: "tall"
  DRINK: "americano"
  
  
# {{ .Values.env.SIZE }} 와 같이 사용 할 수 있습니다.

먼저 values.yaml 파일에 환경변수로 사용 할 키와 값을 생성하였습니다.

다음으로 container에 환경변수를 설정해주도록 하겠습니다.

# deployment.yaml 설명 외에 불필요한 내용은 제거하였습니다.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: starbucks
spec:
  replicas: 1
  template:
    spec:
      containers:
        - name: order
          image: A-workflow:latest
          imagePullPolicy: always
          env:  # values.yaml 에서 생성한 변수를 가져와 key는 환경변수 키로 values 값으로 사용하였습니다.
            {{- range $key, $value := .Values.env }}
            - name: {{ $key }}
              value: {{ $value }}
            {{- end }}

예시처럼 설정해주면 values.yaml env안에 설정 할 환경변수만 추가해주면 container 안에 환경변수로 자동으로 들어간다는 이점이 있습니다, 하지만 워크플로우가 많아지고 A라는 워크플로우에서는 SIZE, DRINK 환경변수만 사용하면 되는데 불필요하게 다른 환경변수를 설정하게 되는 일이 발생할 수 있습니다.

ConfigMap 활용

ConfigMap이란 Kubernetes API 오브젝트입니다. 이를 통해서 다른 오브젝트에서 사용할 구성을 저장할 수 있고 key-value 쌍을 값으로 사용합니다. ConfigMap을 사용하면 앞서 설명한 values.yaml을 이용한 환경변수 활용의 단점을 어느정도 보완 할 수 있습니다.

# configmap.yaml


apiVersion: v1
kind: ConfigMap
metadata:
  name: A-workflow-configmap
data:
  SIZE: "tall"
  DRINK: "americano"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: B-workflow-configmap
data:
  SERVER_ADDRESS: "localhost"
  S3_IMAGE_BUCKET_NAME: "starbucks"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: common-configmap
data:
  COUNTRY: "South Korea"
---

예시에서 3개의 ConfigMap을 생성해주었습니다. common은 공통으로 사용할 수 있는 환경변수가 들어가 있는 ConfigMap이고, 나머지는 각 workflow에 필요한 환경변수들이 들어가 있는 ConfigMap입니다.

이제 컨테이너에 ConfigMap을 환경변수로 넣어주는 방법에 대해 알아보겠습니다.

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: A-workflow
spec:
  entrypoint: start-workflow
  templates:
  - name: start-workflow
    container:
      image: A-workflow:latest
      command: ["python", "-m", "app"]
      envFrom:
      - configMapRef:  # 생성한 ConfigMap을 가져와 컨테이너에 환경변수로 등록
          name: common-configmap
      - configMapRef:
          name: A-workflow-configmap

ConfigMap을 활용하면 위 예시와 같이 워크플로우를 만들고 필요로하는 환경변수들을 생성해줄 수 있습니다.

이렇게 함으로써 추후에 유지보수나 새로운 환경변수가 추가되어도 ConfigMap만 수정하면 된다는 이점이 있습니다.

마무리

Argo Workflow, Argo Events로 워크플로우를 구축하면서 사용했던 것들을 정리해 보았습니다.

이 외에도 사용한 것들이 많지만 중요하고, 실제로 용이하게 사용 할 수 있는 것들을 위주로 작성하였습니다.

워크플로우를 만들 때는 yaml, helm, kubernetes에 대해 제대로 알지 못한 상태에서 구현했기에 정말 많이 고생했던 생각이 떠오르네요.

어느정도 개념이 잡히고 시간이지나고나서 복습하는 개념으로 글을 작성하니 놓쳤던 부분과 개선의 여지가 보이는 부분이 보이네요.

많이 부족하지만 긴 글을 읽어주셔서 감사합니다. 피드백은 환영입니다.

댓글

💲 추천 글