Yeryüzündeki insanlara merhaba! Spark görevlerini planlamak ve çalıştırmak için Airflow kullanıyorum. Tüm bu zamana kadar bulduğum Airflow'un yönetebileceği python DAG'leri.
DAG örnek:Hava akışında Spark kodu nasıl çalıştırılır?
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
Sorun Python kodunda iyi değilim ve Java ile yazılmış bazı görevleri olması. Benim sorum Python DAG içinde Spark Java kavanoz çalıştırmak nasıl? Ya da belki başka bir yolu var mı? Kıvılcım gönderdim: http://spark.apache.org/docs/latest/submitting-applications.html
Ama her şeyi nasıl birbirine bağlayacağımı bilmiyorum. Belki birisi daha önce kullandı ve çalışma örneği var. Zaman ayırdığın için teşekkürler!
SparkSQLOperator, ihtiyacım olan şey gibi görünüyor - ancak, bağlantı dizesinin neye benzemesi gerektiğini bilmediğim için işe yaramayacağım - bu konuda bana yardımcı olabilecek herhangi bir belge var mı? –
Bunu yapmazsanız - bağlantı varsayılan olarak idam moduna geçer - bkz. Https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L33 – Tagar