from datetime import datetime, timedelta from textwrap import dedent import pendulum from airflow.operators.bash import BashOperator from airflow import DAG from data.gasan_data import gasanData local_tz = pendulum.timezone("Asia/Seoul") print(gasanData) # gasan 데이터 선언 # gasan_data = [ # { # "ip": "10.231.130.110", # "serialNum": "7D0B3C9PAGFCF3C", # "camName": "dt_lab_1" # }, # { # "ip": "10.231.130.111", # "serialNum": "7D0B3C9PAG8D150", # "camName": "dt_lab_2" # }, # { # "ip": "10.231.130.112", # "serialNum": "7D0B3C9PAGBB838", # "camName": "dt_lab_3" # }, # { # "ip": "10.231.130.113", # "serialNum": "7D0B3C9PAG17F50", # "camName": "dt_lab_4" # }, # ] with DAG( 'batch_test', default_args={ 'depends_on_past': False, 'email': 'kim-jy@lotte.net', }, description='Test Batch Job', schedule_interval='*/1 * * * *', start_date=datetime(2022, 5, 13, tzinfo=local_tz), tags=['test'], catchup=False, ) as dag: # gasan 작업 병렬처리 post_gasan_tasks = [] for i, data in enumerate(gasanData): post_gasan_task = BashOperator( task_id='post_gasan'+str(i+1), bash_command="curl -X \'POST\' \'http://10.231.238.224:30999/api/v1/camera/writeimage\' -H \'Content-Type: application/json\' -d \'{\"id\": \"test\", \"pw\": \"test\", \"ip\": \"%s\", \"serialNum\": \"%s\", \"camName\": \"%s\"}\'" %(data["ip"], data["serialNum"], data["camName"]), ) post_gasan_tasks.append(post_gasan_task) post_gasan_tasks