bc-super.py 1.79 KB
Newer Older
김진영's avatar
김진영 committed
1
2
3
4
5
6
7
8
from datetime import datetime, timedelta
from textwrap import dedent
import pendulum
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.models import Variable
from data.mart_data import martData   # import mart data
from fail_alert import fail_alert   # 실패 시 Teams 알림 발송
김진영's avatar
김진영 committed
9

김진영's avatar
김진영 committed
10
11
12
13
14
# set timezone
local_tz = pendulum.timezone("Asia/Seoul")

# set dag
with DAG(
김진영's avatar
김진영 committed
15
    'bc-super',
김진영's avatar
김진영 committed
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
    default_args={
        'depends_on_past': False,
        'email': 'kim-jy@lotte.net',
        #'on_failure_callback': fail_alert
        'on_success_callback': fail_alert
    },
    description='dag for mart batch jobs',
    schedule_interval='*/1 * * * *',
    start_date=datetime(2022, 5, 13, tzinfo=local_tz),
    tags=['test'],
    catchup=False,
) as dag:
    # (Task1) 헬스체크
    health_check = BashOperator(
        task_id='health_check',
        bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format(
            api_url=Variable.get("INF_API_URL")
        ),
    )

    # (Task2) mart 작업 병렬처리
    post_mart_tasks = []

    for i, data in enumerate(martData):
        post_mart_task = BashOperator(
            task_id='post_mart'+str(i+1),
            bash_command="sleep 30; curl -X \'POST\' \'%s/api/v1/camera/mart/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"%s\", \"pw\": \"%s\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(
                Variable.get("INF_API_URL"), 
                Variable.get("INF_API_ID"), 
                Variable.get("INF_API_PW"), 
                data["ip"], 
                data["serialNum"], 
                data["camName"]
            ),
        )

        post_mart_tasks.append(post_mart_task)

    # 작업 순서 정의
    health_check >> post_mart_tasks