from airflow import DAG from datetime import datetime, timedelta from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash import BashOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.utcnow(), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=1)) start = DummyOperator(task_id='run_this_first', dag=dag) # passing = KubernetesPodOperator(namespace='default', # image="Python:3.6", # cmds=["Python","-c"], # arguments=["print('hello world')"], # labels={"foo": "bar"}, # name="passing-test", # task_id="passing-task", # get_logs=True, # dag=dag # ) # failing = KubernetesPodOperator(namespace='default', # image="ubuntu:1604", # cmds=["Python","-c"], # arguments=["print('hello world')"], # labels={"foo": "bar"}, # name="fail", # task_id="failing-task", # get_logs=True, # dag=dag # ) templated_command = """ df -h """ run_this = BashOperator( task_id='bash_test', bash_command=templated_command, dag=dag, ) start >> run_this # start >> passing >> run_this # start >> failing # passing.set_upstream(start) # failing.set_upstream(start)