在上一篇博客中,我们详细介绍了Spark Standalone模式下集群的启动流程。在Spark 集群启动后,我们要想在集群上运行我们自己编写的程序,该如何做呢?本篇博客就主要介绍Spark Submit提交任务的流程。
我们可以从spark 的官网看到,spark-submit的提交格式如下:
./bin/spark-submit
–class
–master
–deploy-mode
–conf =
… # other options
[application-arguments]
• --class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi) 应用程序的入口
• --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077) master 的URL
• --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) † 集群的部署模式
• --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). Spark的配置文件
• application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. 自己编写的jar包的路径
• application-arguments: Arguments passed to the main method of your main class, if any 需要传入的参数
一个具体的实例如下:
#Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit
–class org.apache.spark.examples.SparkPi
–master spark://207.184.161.138:7077
–deploy-mode cluster
–supervise
–executor-memory 20G
–total-executor-cores 100
/path/to/examples.jar
1000
提交任务要使用$SPARK_HOME下bin目录里面的spark-submit脚本,我们来分析一下这个脚本:
//判断SPARK_HOME的目录是否存在
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
export PYTHONHASHSEED=0
//调用bin目录下的spark-class脚本
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
我们再次进入spark-class的脚本:
//判断SPARK_HOME的目录是否存在
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
//加载spark-env.sh 文件
. "${SPARK_HOME}"/bin/load-spark-env.sh
//检测java的路径
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
if [ "$(command -v java)" ]; then
RUNNER="java"
echo "JAVA_HOME is not set" >&2
exit 1
//检测jars是否存在
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
build_command() {
//执行org.apache.spark.launcher.Main的main函数,解析参数
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
把命令添加到CMD中
CMD+=("$ARG")
//调用方法创建执行命令
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
CMD=("${CMD[@]:0:$LAST}")
//执行命令
exec "${CMD[@]}"
spark-class的脚本,最主要的就是解析参数,以及创建执行命令,把命令交给spark-class的CMD进行执行。我们进入org.apache.spark.launcher.Main这个类里面,然后看一main函数:
public static void main(String[] argsArray) throws Exception {
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);
boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
Map<String, String> env = new HashMap<>();
List<String> cmd;
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
cmd = buildCommand(builder, env, printLaunchCommand);
} catch (IllegalArgumentException e) {
printLaunchCommand = false;
System.err.println("Error: " + e.getMessage());
System.err.println();
MainClassOptionParser parser = new MainClassOptionParser
();
try {
parser.parse(args);
} catch (Exception ignored) {
List<String> help = new ArrayList<>();
if (parser.className != null) {
help.add(parser.CLASS);
help.add(parser.className);
help.add(parser.USAGE_ERROR);
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);
cmd = buildCommand(builder, env, printLaunchCommand);
} else {
AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
cmd = buildCommand(builder, env, printLaunchCommand);
if (isWindows()) {
System.out.println(prepareWindowsCommand(cmd, env));
} else {
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
上面最重要的是完成了一些参数的解析,参数解析正确后会把需要执行的命令加进CMD的数组中,在spark-class的脚本中进行执行,然后进入到org.apache.spark.deploy.SparkSubmit这个类中,看一下main函数:
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
submit.doSubmit(args)
再进入到doSubmit的方法中
def doSubmit(args: Array[String]): Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
根据SparkSubmitAction的动作进行模式匹配,进入submit的方法:
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
} catch {
case e: Exception =>
if (e.
getStackTrace().length == 0) {
error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {
throw e
} else {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
if (uninitLog) {
Logging.uninitialize()
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
case e: SubmitRestConnectionException =>
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
} else {
doRunMain()
上面实际上是首先准备spark的环境,即调用prepareSubmitEnvironment的方法,进入到这个方法里面:
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf()
var childMainClass = ""
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
logWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
prepareSubmitEnvironment这个方法里面,只要做的事情是根据解析的参数,获取集群的部署模式,返回这四个参数:childArgs, childClasspath, sparkConf, childMainClass供后面程序的使用。
本篇博客是按照Standalone的集群部署模式进行介绍,因此,进入以下代码:
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
上面的代码主要完成,根据是否可以使用REST网关的条件,来匹配不同的提交方式,讨论ClientApp的方式进行提交,这里的childMainClass就是我们自己编写的程序的主函数。回到doRunmain的方法:
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sparkConf: SparkConf,
childMainClass: String,
verbose: Boolean): Unit = {
if (verbose) {
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {
addJarToClasspath
(jar, loader)
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
logWarning(s"Failed to load $childMainClass.", e)
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
new JavaMainApplication(mainClass)
进入JavaMainApplication的方法中
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
mainMethod.invoke(null, args)
通过反射机制调用用户编写的main 函数。
假如我们采用的是ClientAPP的方式提交,进入到org.apache.spark.deploy.Client:
进入main函数
object Client {
def main(args: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
new ClientApp().start(args, new SparkConf())
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
在onStart的方法中:
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
在master接收到Client发送来的启动Driver的信息后,整个作业的提交就完成了。接下来,就是Driver的注册。
1,简介在上一篇博客中,我们详细介绍了Spark Standalone模式下集群的启动流程。在Spark 集群启动后,我们要想在集群上运行我们自己编写的程序,该如何做呢?本篇博客就主要介绍Spark Submit提交任务的流程。2,Spark 任务的提交我们可以从spark 的官网看到,spark-submit的提交格式如下:./bin/spark-submit –class –ma...
折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程。这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配。今天我要讲的是如何创建这个Driver
Program的过程。我们先看一下用SparkSubmit提交的方法吧,下面是从官方上面摘抄的内容。这个是提交到standalone集群的方式,打开spark-submit这文件,我们会发现它最后是调用了org.apache.spark.deploy.SparkSubmit这个类。我们直接进去看就行了,main函数就几行代码
1、构建spark执行环境(初始化sparkcont);
2、SparkContext向资源管理器注册并申请Executor资源;
3、资源管理器分配Executor资源,Executor向资源管理器发送心跳汇报状态;
4、Executor向sc注册并申请task;
5、sc向Executor发送task,Executor执行task;
6、运行完成后,sc向资源管理器申请注销资源。
1.2、流程图解(多图)
2.启动脚本调用的是spark-submit,因此我们直接去看spark-submit脚本
# -z是检查后面变量是否为空(空则真) shell可以在双引号之内引用变量,单引号不可
#这一步作用是检查SPARK_HOME变量
前几篇博客详细解析了Spark的Job触发机制、Spark的DAGScheduler调度机制、Spark的TaskScheduler调度机制、Spark调度器的终端通信SchedulerBackend和Spark的Executor启动,在对这些源码进行分析之后,头脑中十分混乱,对于各个机制的具体执行过程不是十分了解。网上的各种分析博客也写得不是十分清晰,于是就开始了Spark任务提交流程的分析。本博客的Spark版本为2.12,是以Standalone Cluster部署模式为基础进行分析。
Spark.
由前面的文章Spark基础06-Spark client和cluster提交流程我们已经知道了Spark client和cluster提交模式流程
启动Driver进程,并向集群管理器注册应用程序
集群资源管理器根据任务配置文件分配并启动Executor
Executor启动之后反向到Driver注册,Driver已经获取足够资源可以运行
Driver开始执行main函数,Spark查询为懒执行,当执行到action算子时开始反向推算,根据宽依赖进行stage的划分,随后每一个stage对应一个
Spark-submit提交流程代码层面提交Spark-submit提交资源分配
代码层面提交
1.用户传递参数执行spark-submit.sh脚本,查询SPARK_Home是否设置。如果没有设置先执行find-spark-home文件设置,已经设置传递参数 和org.apache.spark.deploy.SparkSubmit类执行spark-class.sh脚本
2.spark-class.sh中检查spark环境变量、java环境变量、spark的jars包路径,加载load-spark-e
大数据实验教学系统使用spark-submit工具提交Spark作业对于数据的批处理,通常采用编写程序、打.jar包提交给集群来执行,这需要使用Spark自带的spark-submit工具。
一般的部署策略是在一个网关机器上提交应用程序,这个机器和Worker机器部署在一个网络中(例如,Standalone模式的集群中的Master节点)。在此部署策略中,client模式更为合适,client模式中的driver直接跟spark-submit进程一起启动,spark-submit进程在此扮演集群中一个c