from datetime import datetime, timedelta from textwrap import dedent import pendulum from airflow.operators.bash import BashOperator from airflow import DAG from airflow.models import Variable from data.mart_data import martData # import mart data from fail_alert import fail_alert # 실패 시 Teams 알림 발송 # set timezone local_tz = pendulum.timezone("Asia/Seoul") # set dag with DAG( 'bc-mart', default_args={ 'depends_on_past': False, 'email': 'kim-jy@lotte.net', #'on_failure_callback': fail_alert 'on_success_callback': fail_alert }, description='dag for mart batch jobs', schedule_interval='*/1 * * * *', start_date=datetime(2022, 5, 13, tzinfo=local_tz), tags=['test'], catchup=False, ) as dag: # (Task1) 헬스체크 health_check = BashOperator( task_id='health_check', bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format( api_url=Variable.get("INF_API_URL") ), ) # (Task2) mart 작업 병렬처리 post_mart_tasks = [] for i, data in enumerate(martData): post_mart_task = BashOperator( task_id='post_mart'+str(i+1), bash_command="sleep 30; curl -X \'POST\' \'%s/api/v1/camera/mart/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"%s\", \"pw\": \"%s\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %( Variable.get("INF_API_URL"), Variable.get("INF_API_ID"), Variable.get("INF_API_PW"), data["ip"], data["serialNum"], data["camName"] ), ) post_mart_tasks.append(post_mart_task) # 작업 순서 정의 health_check >> post_mart_tasks