from airflow.operators.bash import BashOperator from airflow.models import Variable from dateutil.relativedelta import relativedelta def fail_alert(context): context_data = """ { "@context": "https://schema.org/extensions", "@type":"MessageCard", "themeColor":"0072C6", "title":"Batch Job Error", "summary":"test", "sections": [ { "facts":[ {"name":"■ Exec Time", "value": "%s"}, {"name":"■ Task", "value": "%s"}, {"name":"■ DAG", "value": "%s"}, {"name":"■ Reason", "value": "%s"}, {"name":"■ Log URL", "value": "%s"}, ] } ] } """ % ( context.get('execution_date')+relativedelta(hours=9), context.get('task_instance').task_id, context.get('task_instance').dag_id, context.get('exception'), context.get('task_instance').log_url.replace("localhost:8080", Variable.get("AIRFLOW_WEB_URL")) ) alert = BashOperator( task_id='fail_alert', bash_command="curl -d \'{data}\' -H \"Content-Type: Application/JSON\" -X POST {teams_url}".format( data=context_data, teams_url=Variable.get("TEAMS_WEBHOOKS_URL") ) ) return alert.execute(context=context)