bc-super.py 1.91 KB
Newer Older
김진영's avatar
김진영 committed
1
2
3
4
5
6
from datetime import datetime, timedelta
from textwrap import dedent
import pendulum
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.models import Variable
김진영's avatar
김진영 committed
7
from data.super_data import superData   # import super data
김진영's avatar
김진영 committed
8
from fail_alert import fail_alert   # 실패 시 Teams 알림 발송
김진영's avatar
김진영 committed
9

김진영's avatar
김진영 committed
10
11
12
13
14
# set timezone
local_tz = pendulum.timezone("Asia/Seoul")

# set dag
with DAG(
김진영's avatar
김진영 committed
15
    'bc-super2',
김진영's avatar
김진영 committed
16
17
18
    default_args={
        'depends_on_past': False,
        'email': 'kim-jy@lotte.net',
김진영's avatar
김진영 committed
19
20
        'on_failure_callback': fail_alert
        #'on_success_callback': fail_alert
김진영's avatar
김진영 committed
21
    },
김진영's avatar
김진영 committed
22
    description='dag for super batch jobs',
김진영's avatar
김진영 committed
23
24
25
26
27
    schedule_interval='*/1 * * * *',
    start_date=datetime(2022, 5, 13, tzinfo=local_tz),
    tags=['test'],
    catchup=False,
) as dag:
김진영's avatar
김진영 committed
28
29
30
31
32
33
34
    # (Task1) Delay
    delay = BashOperator(
        task_id='delay',
        bash_command="sleep 70"
    )

    # (Task2) 헬스체크
김진영's avatar
김진영 committed
35
36
    health_check = BashOperator(
        task_id='health_check',
김진영's avatar
김진영 committed
37
        bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format(
김진영's avatar
김진영 committed
38
39
40
41
            api_url=Variable.get("INF_API_URL")
        ),
    )

김진영's avatar
김진영 committed
42
    # (Task3) super 작업 병렬처리
김진영's avatar
김진영 committed
43
    post_super_tasks = []
김진영's avatar
김진영 committed
44

김진영's avatar
김진영 committed
45
46
47
    for i, data in enumerate(superData):
        post_super_task = BashOperator(
            task_id='post_super'+str(i+1),
김진영's avatar
김진영 committed
48
            bash_command="curl -X \'POST\' \'%s/api/v1/camera/super/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"%s\", \"pw\": \"%s\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(
김진영's avatar
김진영 committed
49
50
51
52
53
54
55
56
57
                Variable.get("INF_API_URL"), 
                Variable.get("INF_API_ID"), 
                Variable.get("INF_API_PW"), 
                data["ip"], 
                data["serialNum"], 
                data["camName"]
            ),
        )

김진영's avatar
김진영 committed
58
        post_super_tasks.append(post_super_task)
김진영's avatar
김진영 committed
59
60

    # 작업 순서 정의
김진영's avatar
김진영 committed
61
    delay >> health_check >> post_super_tasks