batch_test.py 1.78 KB
Newer Older
김진영's avatar
김진영 committed
1
2
from datetime import datetime, timedelta
from textwrap import dedent
김진영's avatar
김진영 committed
3
import pendulum
김진영's avatar
김진영 committed
4
from airflow.operators.bash import BashOperator
김진영's avatar
김진영 committed
5
from airflow import DAG
김진영's avatar
김진영 committed
6
from data.gasan_data import gasanData   # import cj-gasan data
김진영's avatar
김진영 committed
7
from airflow.models import Variable
김진영's avatar
김진영 committed
8
from fail_alert import fail_alert   # 실패 시 Teams 알림 발송
김진영's avatar
김진영 committed
9

김진영's avatar
김진영 committed
10
# set timezone
김진영's avatar
김진영 committed
11
12
local_tz = pendulum.timezone("Asia/Seoul")

김진영's avatar
김진영 committed
13
# set dag
김진영's avatar
김진영 committed
14
15
16
17
with DAG(
    'batch_test',
    default_args={
        'depends_on_past': False,
김진영's avatar
김진영 committed
18
        'email': 'kim-jy@lotte.net',
김진영's avatar
김진영 committed
19
20
        #'on_failure_callback': fail_alert
        'on_success_callback': fail_alert
김진영's avatar
김진영 committed
21
22
    },
    description='Test Batch Job',
김진영's avatar
김진영 committed
23
    schedule_interval='*/1 * * * *',
김진영's avatar
김진영 committed
24
    start_date=datetime(2022, 5, 13, tzinfo=local_tz),
김진영's avatar
김진영 committed
25
    tags=['test'],
김진영's avatar
김진영 committed
26
    catchup=False,
김진영's avatar
김진영 committed
27
) as dag:
김진영's avatar
김진영 committed
28
29
30
    # (Task1) 헬스체크
    health_check = BashOperator(
        task_id='health_check',
김진영's avatar
김진영 committed
31
32
33
        bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format(
            api_url=Variable.get("INF_API_URL")
        ),
김진영's avatar
김진영 committed
34
    )
김진영's avatar
김진영 committed
35

김진영's avatar
김진영 committed
36
    # (Task2) gasan 작업 병렬처리
김진영's avatar
김진영 committed
37
    post_gasan_tasks = []
김진영's avatar
김진영 committed
38

김진영's avatar
김진영 committed
39
40
41
    for i, data in enumerate(gasanData):
        post_gasan_task = BashOperator(
            task_id='post_gasan'+str(i+1),
김진영's avatar
김진영 committed
42
43
44
45
46
47
48
49
            bash_command="curl -X \'POST\' \'%s/api/v1/camera/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"%s\", \"%s\": \"test\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(
                Variable.get("INF_API_URL"), 
                Variable.get("INF_API_ID"), 
                Variable.get("INF_API_PW"), 
                data["ip"], 
                data["serialNum"], 
                data["camName"]
            ),
김진영's avatar
김진영 committed
50
        )
김진영's avatar
김진영 committed
51

김진영's avatar
김진영 committed
52
        post_gasan_tasks.append(post_gasan_task)
김진영's avatar
김진영 committed
53

김진영's avatar
김진영 committed
54
    # 작업 순서 정의
김진영's avatar
김진영 committed
55
    health_check >> post_gasan_tasks