fail_alert.py 1.35 KB
Newer Older
김진영's avatar
김진영 committed
1
from airflow.operators.bash import BashOperator
김진영's avatar
김진영 committed
2
from airflow.models import Variable
김진영's avatar
김진영 committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from dateutil.relativedelta import relativedelta

def fail_alert(context):
    context_data = """
    {
        "@context": "https://schema.org/extensions",
        "@type":"MessageCard",
        "themeColor":"0072C6",
        "title":"Batch Job Error",
        "summary":"test",
        "sections": [
            {
                "facts":[
                    {"name":"■ Exec Time", "value": "%s"},
                    {"name":"■ Task", "value": "%s"},
                    {"name":"■ DAG", "value": "%s"},
                    {"name":"■ Reason", "value": "%s"},
                    {"name":"■ Log URL", "value": "%s"},
                ]
            }
        ]
    }
    """ % (
        context.get('execution_date')+relativedelta(hours=9), 
        context.get('task_instance').task_id, 
        context.get('task_instance').dag_id,
        context.get('exception'),
        context.get('task_instance').log_url.replace("localhost:8080", Variable.get("AIRFLOW_WEB_URL"))
    )

    alert = BashOperator(
        task_id='fail_alert',
        bash_command="curl -d \'{data}\' -H \"Content-Type: Application/JSON\" -X POST {teams_url}".format(
            data=context_data, 
            teams_url=Variable.get("TEAMS_WEBHOOKS_URL")
        )
    )
    return alert.execute(context=context)