Commit 9897d45b authored by 김진영's avatar 김진영
Browse files

Add new file

parent 060137c7
from datetime import datetime, timedelta
from textwrap import dedent
import pendulum
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.models import Variable
from data.gasan_data import gasanData # import gasan data
from fail_alert import fail_alert # 실패 시 Teams 알림 발송
# set timezone
local_tz = pendulum.timezone("Asia/Seoul")
# set dag
with DAG(
'bc-gasan',
default_args={
'depends_on_past': False,
'email': 'kim-jy@lotte.net',
#'on_failure_callback': fail_alert
'on_success_callback': fail_alert
},
description='dag for gasan batch jobs',
schedule_interval='*/1 * * * *',
start_date=datetime(2022, 5, 13, tzinfo=local_tz),
tags=['test'],
catchup=False,
) as dag:
# (Task1) 헬스체크
health_check = BashOperator(
task_id='health_check',
bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format(
api_url=Variable.get("INF_API_URL")
),
)
# (Task2) gasan 작업 병렬처리
post_gasan_tasks = []
for i, data in enumerate(gasanData):
post_gasan_task = BashOperator(
task_id='post_gasan'+str(i+1),
bash_command="curl -X \'POST\' \'%s/api/v1/camera/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"%s\", \"%s\": \"test\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(
Variable.get("INF_API_URL"),
Variable.get("INF_API_ID"),
Variable.get("INF_API_PW"),
data["ip"],
data["serialNum"],
data["camName"]
),
)
post_gasan_tasks.append(post_gasan_task)
# 작업 순서 정의
health_check >> post_gasan_tasks
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment