from datetime import datetime, timedelta from kubernetes.client import models as k8s from airflow.models import DAG, Variable from airflow.operators.dummy_operator import DummyOperator from airflow.kubernetes.secret import Secret from airflow.kubernetes.pod import Resources from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) dag_id = 'kubernetes-dag' task_default_args = { 'owner': 'jhwon', 'retries': 3, 'retry_delay': timedelta(minutes=5), 'start_date': datetime(2022, 5, 11), 'depends_on_past': False, 'email': ['jh_won@lotte.net'], 'email_on_retry': False, 'email_on_failure': False, 'execution_timeout': timedelta(hours=1) } dag = DAG( dag_id=dag_id, description='kubernetes pod operator', default_args=task_default_args, schedule_interval='5 16 * * *', max_active_runs=1 ) env = Secret( 'env', 'TEST', 'test-env', 'TEST', ) pod_resources = Resources() pod_resources.request_cpu = '1' pod_resources.request_memory = '200m' pod_resources.limit_cpu = '2' pod_resources.limit_memory = '400m' configmaps = [ k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='airflow-airflow-config')), ] start = DummyOperator(task_id='start', dag=dag) run = KubernetesPodOperator( task_id='kubernetespodoperator', namespace='airflow', # image='curlimages/curl@sha256:c05146450820bb943c400bf2173a2423096c586a3984b989619d9165d61f1a24', image='curlimages/curl:latest', # cmds=["curl", "-O","https://cdn.jsdelivr.net/npm/vue/dist/vue.js"], # cmds=["echo","hello world man"], cmds=["nslookup","localhost"], # secrets=[ # env # ], image_pull_secrets=[k8s.V1LocalObjectReference('image_credential')], name='curl-job', is_delete_operator_pod=False, get_logs=True, # resources=pod_resources, env_from=configmaps, dag=dag, ) start >> run