from airflow import DAG from datetime import datetime, timedelta from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash import BashOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.utcnow(), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=1)) start = DummyOperator(task_id='run_this_first', dag=dag) passing = KubernetesPodOperator(namespace='default', image="Python:3.6", cmds=["Python","-c"], arguments=["print('hello world')"], labels={"foo": "bar"}, name="passing-test", task_id="passing-task", get_logs=True, dag=dag ) failing = KubernetesPodOperator(namespace='default', image="ubuntu:1604", cmds=["Python","-c"], arguments=["print('hello world')"], labels={"foo": "bar"}, name="fail", task_id="failing-task", get_logs=True, dag=dag ) templated_command = """ kubectl get pods """ run_this = BashOperator( task_id='bash_test', bash_command=templated_command, dag=dag, ) start >> run_this # start >> passing >> run_this # start >> failing # passing.set_upstream(start) # failing.set_upstream(start)