from datetime import datetime, timedelta from textwrap import dedent import pendulum from airflow.operators.bash import BashOperator from airflow import DAG from airflow.models import Variable from data.super_data import superData # import super data from fail_alert import fail_alert # 실패 시 Teams 알림 발송 # set timezone local_tz = pendulum.timezone("Asia/Seoul") # set dag with DAG( 'db-backup', default_args={ 'depends_on_past': False, 'email': 'kim-jy@lotte.net', 'on_failure_callback': fail_alert #'on_success_callback': fail_alert }, description='dag for super batch jobs', schedule_interval='* * */1 * *', start_date=datetime(2022, 5, 13, tzinfo=local_tz), tags=['test'], catchup=False, ) as dag: # (Task1) DB 접속 주소 가져오기 # get_db_ip = BashOperator( # task_id='get_db_ip', # bash_command="kubectl get svc mariadb-galera -o yaml -n mariadb -o jsonpath=\"{.spec.clusterIP}\"" # ) mkdir_date = BashOperator( task_id='mkdir_date', bash_command="cd /opt/airflow/logs; mkdir test; pwd" ) # # (Task2) 헬스체크 # health_check = BashOperator( # task_id='health_check', # bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format( # api_url=Variable.get("INF_API_URL") # ), # ) # 작업 순서 정의 # get_db_ip >> health_check >> post_super_tasks mkdir_date