from datetime import datetime, timedelta from textwrap import dedent import pendulum from airflow.operators.bash import BashOperator from airflow import DAG from data.gasan_data import gasanData # import cj-gasan data from dateutil.relativedelta import relativedelta from airflow.models import Variable # set timezone local_tz = pendulum.timezone("Asia/Seoul") def fail_alert(context): context_data = """ { "@context": "https://schema.org/extensions", "@type":"MessageCard", "themeColor":"0072C6", "title":"Batch Job Error", "summary":"test", "sections": [ { "facts":[ {"name":"■ Exec Time", "value": "%s"}, {"name":"■ Task", "value": "%s"}, {"name":"■ DAG", "value": "%s"}, {"name":"■ Reason", "value": "%s"}, {"name":"■ Log URL", "value": "%s"}, ] } ] } """ % ( context.get('execution_date')+relativedelta(hours=9), context.get('task_instance').task_id, context.get('task_instance').dag_id, context.get('exception'), context.get('task_instance').log_url.replace("localhost:8080", "10.231.238.224:31258") ) alert = BashOperator( task_id='fail_alert', bash_command="curl -d \'{data}\' -H \"Content-Type: Application/JSON\" -X POST https://lottegroup.webhook.office.com/webhookb2/2ed9f7fc-4c60-4d2d-a61c-aa50c0075564@dc742f86-8941-4de1-8d2c-d2dfef93cfe8/IncomingWebhook/1047eeaf7bde45a08e5ccb4d6c80f08d/d7352368-8126-4827-aab7-4a62b0b5abc2".format(data=context_data) ) return alert.execute(context=context) with DAG( 'batch_test', default_args={ 'depends_on_past': False, 'email': 'kim-jy@lotte.net', #'on_failure_callback': fail_alert 'on_success_callback': fail_alert }, description='Test Batch Job', schedule_interval='*/1 * * * *', start_date=datetime(2022, 5, 13, tzinfo=local_tz), tags=['test'], catchup=False, ) as dag: # (Task1) 헬스체크 health_check = BashOperator( task_id='health_check', bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format(Variable.get("INF_API_URL ")), ) # (Task2) gasan 작업 병렬처리 post_gasan_tasks = [] for i, data in enumerate(gasanData): post_gasan_task = BashOperator( task_id='post_gasan'+str(i+1), bash_command="curl -X \'POST\' \'http://10.231.238.224:30999/api/v1/camera/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"test\", \"pw\": \"test\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(data["ip"], data["serialNum"], data["camName"]), ) post_gasan_tasks.append(post_gasan_task) # 작업 순서 정의 health_check >> post_gasan_tasks