Now here are some details on SparkSubmitOperator. Please include following dependency
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
#Defined Different Input Parameters. You can add based on your spark-submit requirement.
#Spark-Submit-Operator Configuration Settings
__config = {
\’driver_memory\’: \’2g\’, #spark submit equivalent spark.driver.memory or driver-memory
\’executor_memory\’: \’512m\’,#spark-submit equivalent executor-memory
\’num_executors\’: 3, #spark-submit equivalent num-executors
\’executor-cores\’: 3,
\’verbose\’: True,
\’name\’: \’Scapidb2TrxUpcase\’,#Application Name i.e. –name
\’java_class\’: \’com.rkg.spark.scala.generated.Scapidb2TrxUpcase’,#Spark Submit Main Class –class
\’application\’: \’/Users/sethugupta/Scapidb2TrxUpcase.jar\’,#Application Jar File
\’application_args\’: [\’yarn\’, \’172.18.0.2\’,]#Application Arguments supplied along with jar
}#My Application Scapidb2TrxUpcase_batch.jar contains 2 input args yarn 172.18.0.2
#Using SparkSubmitOperator
spark_submit_task = SparkSubmitOperator(
task_id=\’sparksubmit_job\’, #Any Name: Your Airflow Task ID
conn_id=\’spark_hdp\’, #Spark Connection in Airflow, Can use spark_default
conf={ #Properties you want to define as –conf during your spark-submit call
\’spark.hadoop.yarn.timeline-service.enabled\’: \’false\’,#Latest Yarn and HDP issue
\’spark.authenticate\’: \’false\’,
\’spark.driver.host\’: \’172.18.0.2\’,
\’spark.driver.extraClassPath\’: abc.jar,xyz.jar, #spark.driver.extraClassPath
\’spark.jars\’: file:/tmp/bdf.jar,file:/tmp/pqr.jar#Dependent Jar Files
},#You can add any other –conf property inside it.
dag=dag,#Dag Name
**__config #Different Properties defined above to keep the use simpler
)
Here are details about my Spark Connector created under Airflow Admin Tab. Use host yarnhere. Can leave Connection Type empty. You can add more information under this extra which doesn\'t fit within config and --conf.{\"queue\":\"default\", \"deploy-mode\":\"cluster\", \"spark-home\":\"My Spark Home\", \"spark-binary\":\"spark-submit\", \"namespace\":\"default\"}
Here are Details about
{\"queue\":\"default\", \"deploy-mode\":\"cluster\", \"spark-home\":\"My Spark Home\", \"spark-binary\":\"spark-submit\", \"namespace\":\"default\"} Here are Details about SparkSubmitOperator itself I used along with its source.Please do let me know if you need additional help. I\'ll find a mechanism to publish working sample soon.
\" target=\"_blank\">SparkSubmitOperator itself I used along with its source.Please do let me know if you need additional help. I\'ll find a mechanism to publish working sample soon. I am exploring for helper operators to address few gaps this operator has related to real-time log information from cluster to airflow.-Ritesh
Disclaimer: “The postings on this site are my own and don’t necessarily represent IBM’s positions, strategies or opinions.”
hi ritesh,can you please share complete dag file. Also how to get emr cluster connection through dag. Thanks,SaiRaghu
LikeLike
Hi Ritesh, can we specify key value paired spark job arguments in the DAG?For example: my spark submit command has key value pairs such as jobType=dm orgCode=abcHow will I be able to configure this in the airflow dag?
LikeLike
Where do I mention my pyspark script to be scheduled.
LikeLike