batch_test.py 3.23 KB
Newer Older
김진영's avatar
김진영 committed
1
2
from datetime import datetime, timedelta
from textwrap import dedent
김진영's avatar
김진영 committed
3
import pendulum
김진영's avatar
김진영 committed
4
from airflow.operators.bash import BashOperator
김진영's avatar
김진영 committed
5
from airflow import DAG
김진영's avatar
김진영 committed
6
from data.gasan_data import gasanData   # import cj-gasan data
김진영's avatar
김진영 committed
7

김진영's avatar
김진영 committed
8
# set timezone
김진영's avatar
김진영 committed
9
10
local_tz = pendulum.timezone("Asia/Seoul")

김진영's avatar
김진영 committed
11
def fail_alert(context):
김진영's avatar
김진영 committed
12
13
    context_data = """
    {
김진영's avatar
김진영 committed
14
15
16
17
18
19
20
21
        "@context": "https://schema.org/extensions",
        "@type":"MessageCard",
        "themeColor":"0072C6",
        "title":"Batch Job Error",
        "summary":"test",
        "sections": [
            {
                "facts":[
김진영's avatar
김진영 committed
22
23
24
25
26
                    {"name":"■ Execution Time", "value": "{exec_date}"},
                    {"name":"■ Task", "value": "{task}"},
                    {"name":"■ DAG", "value": "{dag}"},
                    {"name":"■ Log URL", "value": "{log_url}"},
                    {"name":"■ Reason", "value":"{reason}"}
김진영's avatar
김진영 committed
27
28
29
30
                ]
            }
        ]
    }
김진영's avatar
김진영 committed
31
32
33
34
35
36
37
    """.format(
        exec_date=context.get('execution_date'),
        task=context.get('task_instance').task_id,
        dag=context.get('task_instance').dag_id,
        log_url=context.get('task_instance').log_url,
        reason=context.get('exception')
    )
김진영's avatar
김진영 committed
38

김진영's avatar
김진영 committed
39
40
    alert = BashOperator(
        task_id='fail_alert',
김진영's avatar
김진영 committed
41
        bash_command="curl -d \'{data}\' -H \"Content-Type: Application/JSON\" -X POST https://lottegroup.webhook.office.com/webhookb2/2ed9f7fc-4c60-4d2d-a61c-aa50c0075564@dc742f86-8941-4de1-8d2c-d2dfef93cfe8/IncomingWebhook/1047eeaf7bde45a08e5ccb4d6c80f08d/d7352368-8126-4827-aab7-4a62b0b5abc2".format(data=context_data)
김진영's avatar
김진영 committed
42
        # bash_command="curl -d \'{\"@context\":\"https://schema.org/extensions\",\"@type\":\"MessageCard\",\"themeColor\":\"0072C6\",\"title\":\"Batch Job Error\",\"summary\":\"test\",\"sections\":[{\"facts\":[{\"name\":\"TEST1\", \"value\": \"TEST1\"}]}]}\' -H \"Content-Type: Application/JSON\" -X POST https://lottegroup.webhook.office.com/webhookb2/2ed9f7fc-4c60-4d2d-a61c-aa50c0075564@dc742f86-8941-4de1-8d2c-d2dfef93cfe8/IncomingWebhook/1047eeaf7bde45a08e5ccb4d6c80f08d/d7352368-8126-4827-aab7-4a62b0b5abc2"
김진영's avatar
김진영 committed
43
44
45
    )
    return alert.execute(context)

김진영's avatar
김진영 committed
46
47
48
49
with DAG(
    'batch_test',
    default_args={
        'depends_on_past': False,
김진영's avatar
김진영 committed
50
        'email': 'kim-jy@lotte.net',
김진영's avatar
김진영 committed
51
        'on_failure_callback': fail_alert
김진영's avatar
김진영 committed
52
53
    },
    description='Test Batch Job',
김진영's avatar
김진영 committed
54
    schedule_interval='*/1 * * * *',
김진영's avatar
김진영 committed
55
    start_date=datetime(2022, 5, 13, tzinfo=local_tz),
김진영's avatar
김진영 committed
56
    tags=['test'],
김진영's avatar
김진영 committed
57
    catchup=False,
김진영's avatar
김진영 committed
58
) as dag:
김진영's avatar
김진영 committed
59
60
61
    # (Task1) 헬스체크
    health_check = BashOperator(
        task_id='health_check',
김진영's avatar
김진영 committed
62
        bash_command="curl -X GET -v http://10.231.238.224:30999/api/v1/core/health \'-H accept: application/json\'",
김진영's avatar
김진영 committed
63
    )
김진영's avatar
김진영 committed
64

김진영's avatar
김진영 committed
65
    # (Task2) gasan 작업 병렬처리
김진영's avatar
김진영 committed
66
    post_gasan_tasks = []
김진영's avatar
김진영 committed
67

김진영's avatar
김진영 committed
68
69
70
71
72
    for i, data in enumerate(gasanData):
        post_gasan_task = BashOperator(
            task_id='post_gasan'+str(i+1),
            bash_command="curl -X \'POST\' \'http://10.231.238.224:30999/api/v1/camera/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"test\", \"pw\": \"test\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(data["ip"], data["serialNum"], data["camName"]),
        )
김진영's avatar
김진영 committed
73

김진영's avatar
김진영 committed
74
        post_gasan_tasks.append(post_gasan_task)
김진영's avatar
김진영 committed
75

김진영's avatar
김진영 committed
76
    # 작업 순서 정의
김진영's avatar
김진영 committed
77
    health_check >> post_gasan_tasks