2017-12-24 209 views
5

Airflow'ta, job_flow_id emr adımlarından birine geçmem gereken sorunla karşı karşıyayım. Operatörden job_flow_id'u alma yeteneğine sahibim, ancak kümeye gönderilecek adımları oluşturacağım zaman, task_instance değeri doğru değil.Airflow - EMR operatöründe Görev Örneği

def issue_step(name, args): 
    return [ 
     { 
      "Name": name, 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": { 
       "Jar": "s3://....", 
       "Args": args 
      } 
     } 
    ] 

dag = DAG('example', 
      description='My dag', 
      schedule_interval='0 8 * * 6', 
      dagrun_timeout=timedelta(days=2)) 

try: 

    create_emr = EmrCreateJobFlowOperator(
     task_id='create_job_flow', 
     aws_conn_id='aws_default',   
     dag=dag 
    ) 

    load_data_steps = issue_step('load', ['arg1', 'arg2']) 

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') 
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
     "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id 

    load_data = EmrAddStepsOperator(
     task_id='load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others 
     aws_conn_id='aws_default', 
     steps=load_data_steps, 
     dag=dag 
    ) 

    check_load_data = EmrStepSensor(
     task_id='watch_load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    cluster_remover = EmrTerminateJobFlowOperator(
     task_id='remove_cluster', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    create_emr_recommendations >> load_data 
    load_data >> check_load_data 
    check_load_data >> cluster_remover 

except AirflowException as ae: 
    print ae.message 

sorun yerine load_data adımda --cluster-id j-1234 görme, EMR kontrol ederken, ben --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}" bkz, yani benim adım başarısız olmasına neden olur: Aşağıdaki kod var.

Adım adımdaki gerçek değeri nasıl alabilirim?

Teşekkür ve mutlu tatiller

+0

Değeri tırnak işaretleri olmadan eklemeyi denediniz mi? load_data_steps [0] ["HadoopJarStep"] ["Args"]. Eklemek ( {{task_instance.xcom_pull ('create_job_flow', anahtar = 'return_value')}}) –

+0

"task_instance" işlevini nereden alabilirim? nesnesi Hala nasıl kullanacağımı öğreniyorum. – davideberdin

cevap

3

Ben hava akışı depo yaklaşık this üzerinde PR olduğunu öğrendim. Sorun, EmrAddStepsOperator'daki adımlar için şablonlamanın olmamasıdır.

  • düzenlendi EmrAddStepsOperator
  • Eklendi bu operatör gelen Plugin olarak devralan bir özel operatör
  • Aranan benim DAG yeni operatör İşte

dosyası: Bu sorunu aşmak için, aşağıdaki yaptım Özel işleç ve eklenti için kod custom_emr_add_step_operator.py (bkz. aşağıdaki ağaç)

Benim DAG dosyasında

bu şekilde

from airflow.operators import CustomEmrAddStepsOperator 
eklentiyi denilen

projemin ve eklentileri yapısı aşağıdaki gibidir:

├── config 
│   └── airflow.cfg 
├── dags 
│   ├── __init__.py 
│   └── my_dag.py 
├── plugins 
│   ├── __init__.py 
│   └── operators 
│    ├── __init__.py 
│    └── custom_emr_add_step_operator.py 
└── requirements.txt 

Böyle pycharm olarak bir IDE kullanıyorsanız, bu irade şikayetçi çünkü modülü bulamadığını söylüyor. Ama Airflow'u çalıştırdığınızda, bu sorun ortaya çıkmayacak. Ayrıca, airflow.cfg sayfanızda sağdaki plugins klasörünü işaret edeceğinizi unutmayın, böylece Airflow yeni oluşturduğunuz eklentiyi okuyabilir.