[spark] standalone集群模式Driver启动过程

标签: spark

      本篇文章简单整理一下spark在standalone集训模式下启动Driver的流程,本篇文章只解析到Driver启动成功,启动后续任务执行在后面博客更新,个人比较喜欢从代码跟踪,文章代码粘贴只提取部分重要代码。。。。。。

一、脚本查看

spark-submit触发任务的提交,查看spark-submit脚本会看出最终执行任务的主类是:

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "[email protected]"

二、源码解析

idea打开源码包(spark2.1.1),查看SparkSubmitmain方法,很明显我们需要查看submit()方法

 //提交任务主类运行
override def main(args: Array[String]): Unit = {
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    //设置参数
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      //任务提交匹配 submit
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

submit()方法体内第一行注明了一个重点属性childMainClass,所以这里需要关注prepareSubmitEnvironment()的调用过程

submit()

@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
//以下方法返回四元组,重点注意childMainClass类 这里以standalone-cluster为例
val (childArgs, childClasspath, sparkConf, childMainClass)=prepareSubmitEnvironment(args)
   ........
 //运行
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}

prepareSubmitEnvironment()==>doPrepareSubmitEnvironment(),为了查看主要逻辑代码,删除了大部分代码,这里需要重点关注不同deployMode下的childMainClass的变化,代码注释很清楚,方法最后返回四元组

private def doPrepareSubmitEnvironment(
   .......
    var childMainClass = ""
.........
//客户端模式提交任务,那么这里args.mainClass就是我们提交任务的主类,直接在客户端启动Driver
    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      }
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    }
.........
 //standalone-cluster模式
    if (args.isStandaloneCluster) {
      //使用rest风格,这里rest提交是指使用json 格式和http 提交任务 ,spark1.3+支持
      if (args.useRest) {
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        //正常提交方式
        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }
......
(childArgs, childClasspath, sparkConf, childMainClass)
}

当deployMode为CLIENT时,driver会在客户端直接运行,这里我们关注集群模式提交的任务,也是生产环境中用到的。STANDALONE_CLUSTER_SUBMIT_CLASS对应的类是ClientApp,文章后面会再次解析这个类,这里先跳回到主流程submit

//org.apache.spark.deploy.ClientApp
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()

在submit()方法最后一行会执行runMain()方法,runmain方法内部会加载、利用反射实例化childMainClass的引用类(ClientApp),并调用start方法

 private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sparkConf: SparkConf,
      childMainClass: String,
     ......
    try {
      //加载类
      mainClass = Utils.classForName(childMainClass)
    } catch {
    ......
    //将mainClass 映射成SparkApplication对象
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass))         {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    ......
    try {
      //调用start方法,这里调用的是ClientApp的start方法
      app.start(childArgs.toArray, sparkConf)
    } catch {
......

继续跟踪到ClientApp的start方法,其实也是ClientAPP的唯一方法,从代码中可以看出ClientAPP的start()方法主要做了三件事:

1、创建rpc环境

2、获取所有master引用

3、注册ClientEndpoint

override def start(args: Array[String], conf: SparkConf): Unit = {
    val driverArgs = new ClientArguments(args)
    if (!conf.contains("spark.rpc.askTimeout")) {
      conf.set("spark.rpc.askTimeout", "10s")
    }
    Logger.getRootLogger.setLevel(driverArgs.logLevel)
    //创建rpc通信环境
    val rpcEnv =RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    //得到Master的通信邮箱
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    //在rpc中设置提交当前任务的Endpoint,只要设置肯定会运行 new ClientEndpoint 类的 start方法
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    rpcEnv.awaitTermination()
}

当endpoint被注册时,很明显会执行onstart()方法,所以继续看ClientEndpoint的onstart(),方法中重点关注属性mainClass、command对象、driverDescription对象,最后会调用asyncSendToMasterAndForwardReply方法向master提交信息

override def onStart(): Unit = {
    driverArgs.cmd match {
      case "launch" =>
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
       ......
        //将DriverWrapper 这个类封装到Command中
        val command = new Command(mainClass,Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,sys.env, classPathEntries, libraryPathEntries, javaOpts)

        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,driverArgs.memory,driverArgs.cores,
          driverArgs.supervise,command)
        //向Master申请启动Driver,Master中的 receiveAndReply 方法会接收此请求消息
        asyncSendToMasterAndForwardReply[SubmitDriverResponse](
          RequestSubmitDriver(driverDescription))
     ......
  }

asyncSendToMasterAndForwardReply方法继续调用masterEndpoint的ask方法,对应的,master的receiveAndReply方法会有相应处理

  private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
    for (masterEndpoint <- masterEndpoints) {
      masterEndpoint.ask[T](message).onComplete {
        case Success(v) => self.send(v)
        case Failure(e) =>
          logWarning(s"Error sending messages to master $masterEndpoint", e)
      }(forwardMessageExecutionContext)
    }
  }

Master的receiveAndReply方法中除了创建Driver、回复消息之外,调用了一个核心的方法----------schedule()

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RequestSubmitDriver(description) =>
      //判断Master状态
      if (state != RecoveryState.ALIVE) {
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }

再次进入schedule()方法,这个方法很重要,注释我都贴下来了,这个方法会将会筛选可用workers中满足资源条件的worker,做两件牛X的事情,Driver调度、Executor资源分配

//schedule() 方法是通用的方法
//这个方法中当申请启动Driver的时候也会执行,但是最后一行的startExecutorsOnWorkers 方法中 //waitingApp是空的,只是启动Driver。
//在提交application时也会执行到这个scheduler方法,这个时候就是要启动的Driver是空的,但是会直接//运行startExecutorsOnWorkers 方法给当前的application分配资源

  private def schedule(): Unit = {
    //判断Master状态
    if (state != RecoveryState.ALIVE) {return}
    // Drivers take strict precedence over executors 这里是打散worker
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    //可用的worker数量
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        //拿到curPos位置的worker
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          //这里是启动Driver,启动Driver之后会为当前的application 申请资源
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        //curPos 就是一直加一的往后取 Worker  ,一直找到满足资源的worker
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }

接下来就是worker该工作了,满足条件的worker会执行launchDriver()方法,也就是给worker发送消息启动driver

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    //给Worker发送消息启动Driver,这里在Worker中会有receive方法一直匹配LaunchDriver
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    driver.state = DriverState.RUNNING
}

所以最后到Worker的receive()方法,Driver在worker节点中启动

override def receive: PartialFunction[Any, Unit] = synchronized {
........
//启动Driver,这里说启动的Driver就是刚才说的 val //mainClass="org.apache.spark.deploy.worker.DriverWrapper"
// Driver启动就是DriverWrapper类启动,DriverWrapper的启动就是在Worker中创建一个Driver 进程,
//之后就是启动DriverWrapper的main方法

    case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      //启动Driver,会初始化 org.apache.spark.deploy.worker.DriverWrapper ,运行main方法
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
.........

三、总结

这样直接看源码没思路看完文章没毛用,所以简单整理一下思路,大致流程是:客户端在spark集群中的一个节点提交任务(spark-submit),并在参数注明deployMode类型(client,cluster),ClientApp和master通信,封装driver对象发送个master,master挑选一台满足资源的worker,然后Worker在本机fork()一个DriverWrapper对象,DriverWrapper对象会执行客户端的逻辑代码

具体步骤整理:

1、客户端提交任务到集群,客户端spark-submit任务执行SparkSubmit类的main方法,main方法中调用submit()

2、集群根据参数判断客户端提交任务模式,doPrepareSubmitEnvironment()方法中判断deployMode值来决定提交任务的环境(本文只分析cluster模式任务)

3、SparkSubmit对象在runMain()中加载并利用反射实例化childMainClass的引用类(ClientApp)

4、执行ClientApp的start()方法,该方法主要完成三件事:a、创建rpc环境;  b、获取所有master引用;  c、注册ClientEndpoint

5、ClientEndpoint被注册后会执行onstart()方法,该方法会封装一个DriverWrapper对象到Command中,然后将Command的实例化对象封装到DriverDescription中,然后向Master申请启动Driver,Master中的receiveAndReply 方法会接收此请求消息

6、Master匹配消息类型(RequestSubmitDriver)然后处理,创建driver对象,回复消息,重要的是执行schedule()方法调度资源

7、Master的schedule()方法内部会删选满足条件(内存、核心数量)的worker随机挑选一节点发送启动Driver命令

8、Worker收到消息匹配类型(LaunchDriver)启动Driver,运行DriverMapper的main()方法

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

版权声明:本文为u012989317原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012989317/article/details/92673836

智能推荐

RIP/DHCP/ACL综合实验

组播: 加入组的组成员才会接受到消息,只需要将流量发送一次到组播地址 减少控制面流量,减少头部复制, RIP1  广播   有类  不支持认证 RIP2  组播   无类  (支持VLAN)、支持认证 所有距离矢量路由协议:具有距离矢量特征的协议,都会在边界自动汇总 控制平面  路由的产生是控制平面的流量 数据平面  ...

【Sublime】使用 Sublime 工具时运行python文件

使用 Sublime 工具时报Decode error - output not utf-8解决办法   在菜单中tools中第四项编译系统 内最后一项增添新的编译系统 自动新建 Python.sublime-build文件,并添加"encoding":"cp936"这一行,保存即可 使用python2 则注释encoding改为utf-8 ctr...

java乐观锁和悲观锁最底层的实现

1. CAS实现的乐观锁 CAS(Compare And Swap 比较并且替换)是乐观锁的一种实现方式,是一种轻量级锁,JUC 中很多工具类的实现就是基于 CAS 的,也可以理解为自旋锁 JUC是指import java.util.concurrent下面的包, 比如:import java.util.concurrent.atomic.AtomicInteger; 最终实现是汇编指令:lock...

Python 中各种imread函数的区别与联系

  原博客:https://blog.csdn.net/renelian1572/article/details/78761278 最近一直在用python做图像处理相关的东西,被各种imread函数搞得很头疼,因此今天决定将这些imread总结一下,以免以后因此犯些愚蠢的错误。如果你正好也对此感到困惑可以看下这篇总结。当然,要了解具体的细节,还是应该 read the fuc...

用栈判断一个字符串是否平衡

注: (1)本文定义:左符号:‘(’、‘[’、‘{’…… 右符号:‘)’、‘]’、‘}’……. (2)所谓的字符串的符号平衡,是指字符串中的左符号与右符号对应且相等,如字符串中的如‘(&r...

猜你喜欢

JAVA环境变量配置

位置 计算机->属性->高级系统设置->环境变量 方式一 用户变量新建path 系统变量新建classpath 方式二 系统变量 新建JAVA_HOME,值为JDK路径 编辑path,前加 方式三 用户变量新建JAVA_HOME 此路径含lib、bin、jre等文件夹。后运行tomcat,eclipse等需此变量,故最好设。 用户变量编辑Path,前加 系统可在任何路径识别jav...

常用的伪类选择器

CSS选择器众多 CSS选择器及权重计算 最常用的莫过于类选择器,其它的相对用的就不会那么多了,当然属性选择器和为类选择器用的也会比较多,这里我们就常用的伪类选择器来讲一讲。 什么是伪类选择器? CSS伪类是用来添加一些选择器的特殊效果。 常用的为类选择器 状态伪类 我们中最常见的为类选择器就是a标签(链接)上的为类选择器。 当我们使用它们的时候,需要遵循一定的顺序问题,否则将可能出现bug 注意...

ButterKnife的使用介绍及原理探究(六)

前面分析了ButterKnife的源码,了解其实现原理,那么就将原理运用于实践吧。 github地址:       点击打开链接 一、自定义注解 这里为了便于理解,只提供BindView注解。 二、添加注解处理器 添加ViewInjectProcessor注解处理器,看代码, 这里分别实现了init、getSupportedAnnotationTypes、g...

1.写一个程序,提示输入两个字符串,然后进行比较,输出较小的字符串。考试复习题库1|要求:只能使用单字符比较操作。

1.写一个程序,提示输入两个字符串,然后进行比较,输出较小的字符串。 要求只能使用单字符比较操作。 参考代码: 实验结果截图:...

小demo:slideDown()实现二级菜单栏下拉效果

效果如下,鼠标经过显示隐藏的二级菜单栏 但是这样的时候会存在一个问题,就是鼠标快速不停移入移出会导致二级菜单栏闪屏现象,一般需要使用stop()来清除事件  ...