Commit 12226568 authored by 김진영's avatar 김진영
Browse files

Update batch_test.py

parent 329bc14c
...@@ -4,46 +4,13 @@ import pendulum ...@@ -4,46 +4,13 @@ import pendulum
from airflow.operators.bash import BashOperator from airflow.operators.bash import BashOperator
from airflow import DAG from airflow import DAG
from data.gasan_data import gasanData # import cj-gasan data from data.gasan_data import gasanData # import cj-gasan data
from dateutil.relativedelta import relativedelta
from airflow.models import Variable from airflow.models import Variable
from fail_alert import fail_alert # 실패 시 Teams 알림 발송
# set timezone # set timezone
local_tz = pendulum.timezone("Asia/Seoul") local_tz = pendulum.timezone("Asia/Seoul")
def fail_alert(context): # set dag
context_data = """
{
"@context": "https://schema.org/extensions",
"@type":"MessageCard",
"themeColor":"0072C6",
"title":"Batch Job Error",
"summary":"test",
"sections": [
{
"facts":[
{"name":"■ Exec Time", "value": "%s"},
{"name":"■ Task", "value": "%s"},
{"name":"■ DAG", "value": "%s"},
{"name":"■ Reason", "value": "%s"},
{"name":"■ Log URL", "value": "%s"},
]
}
]
}
""" % (
context.get('execution_date')+relativedelta(hours=9),
context.get('task_instance').task_id,
context.get('task_instance').dag_id,
context.get('exception'),
context.get('task_instance').log_url.replace("localhost:8080", Variable.get("AIRFLOW_WEB_URL"))
)
alert = BashOperator(
task_id='fail_alert',
bash_command="curl -d \'{data}\' -H \"Content-Type: Application/JSON\" -X POST {teams_url}".format(data=context_data, teams_url=Variable.get("TEAMS_WEBHOOKS_URL"))
)
return alert.execute(context=context)
with DAG( with DAG(
'batch_test', 'batch_test',
default_args={ default_args={
...@@ -61,7 +28,9 @@ with DAG( ...@@ -61,7 +28,9 @@ with DAG(
# (Task1) 헬스체크 # (Task1) 헬스체크
health_check = BashOperator( health_check = BashOperator(
task_id='health_check', task_id='health_check',
bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format(api_url=Variable.get("INF_API_URL")), bash_command="curl -X GET -v {api_url}/api/v1/core/health \'-H accept: application/json\'".format(
api_url=Variable.get("INF_API_URL")
),
) )
# (Task2) gasan 작업 병렬처리 # (Task2) gasan 작업 병렬처리
...@@ -70,7 +39,14 @@ with DAG( ...@@ -70,7 +39,14 @@ with DAG(
for i, data in enumerate(gasanData): for i, data in enumerate(gasanData):
post_gasan_task = BashOperator( post_gasan_task = BashOperator(
task_id='post_gasan'+str(i+1), task_id='post_gasan'+str(i+1),
bash_command="curl -X \'POST\' \'%s/api/v1/camera/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"%s\", \"%s\": \"test\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(Variable.get("INF_API_URL"), Variable.get("INF_API_ID"), Variable.get("INF_API_PW"), data["ip"], data["serialNum"], data["camName"]), bash_command="curl -X \'POST\' \'%s/api/v1/camera/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"%s\", \"%s\": \"test\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(
Variable.get("INF_API_URL"),
Variable.get("INF_API_ID"),
Variable.get("INF_API_PW"),
data["ip"],
data["serialNum"],
data["camName"]
),
) )
post_gasan_tasks.append(post_gasan_task) post_gasan_tasks.append(post_gasan_task)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment