[spark streaming]生成RDD并执行Spark Job源码内幕解密

标签: spark

本博文主要包含以下内容:

  • DStream产生RDD的案例实战演示
  • DStream作为RDD模板的原理机制
  • 常见DStream生产RDD源码解密

这种常见的DStream包含三种类型,一种是输入的级别的InputDStream,第二种transformationDStream,第三种输出级别的ForeachDStream。

博文主要代码如下:

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(120))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream("master",9999)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

通过集群集群处理数据,处理结果如下:

16/09/08 09:18:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 51 ms on localhost (1/1)
16/09/08 09:18:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
16/09/08 09:18:00 INFO scheduler.JobScheduler: Finished job streaming job 1473297480000 ms.0 from job set of time 1473297480000 ms
16/09/08 09:18:00 INFO scheduler.JobScheduler: Total delay: 0.927 s for time 1473297480000 ms (execution: 0.670 s)
16/09/08 09:18:00 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/09/08 09:18:00 INFO scheduler.InputInfoTracker: remove old batch metadata: 
16/09/08 09:18:15 INFO storage.MemoryStore: Block input-0-1473297495000 stored as bytes in memory (estimated size 16.0 B, free 89.8 KB)
16/09/08 09:18:15 INFO storage.BlockManagerInfo: Added input-0-1473297495000 in memory on localhost:53535 (size: 16.0 B, free: 511.1 MB)
16/09/08 09:18:15 WARN storage.BlockManager: Block input-0-1473297495000 replicated to only 0 peer(s) instead of 1 peers
16/09/08 09:18:15 INFO receiver.BlockGenerator: Pushed block input-0-1473297495000
16/09/08 09:20:00 INFO scheduler.JobScheduler: Starting job streaming job 1473297600000 ms.0 from job set of time 1473297600000 ms
16/09/08 09:20:00 INFO scheduler.JobScheduler: Added jobs for time 1473297600000 ms
16/09/08 09:20:00 INFO spark.SparkContext: Starting job: print at NetWorkWordCount.scala:24
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Registering RDD 7 (map at NetWorkWordCount.scala:23)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Got job 3 (print at NetWorkWordCount.scala:24) with 1 output partitions
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (print at NetWorkWordCount.scala:24)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 5)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 5 (MapPartitionsRDD[7] at map at NetWorkWordCount.scala:23), which has no missing parents
16/09/08 09:20:00 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.6 KB, free 92.4 KB)
16/09/08 09:20:00 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1604.0 B, free 94.0 KB)
16/09/08 09:20:00 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:53535 (size: 1604.0 B, free: 511.1 MB)
16/09/08 09:20:00 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 5 (MapPartitionsRDD[7] at map at NetWorkWordCount.scala:23)
...


16/09/08 09:20:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/09/08 09:20:00 INFO executor.Executor: Finished task 0.0 in stage 8.0 (TID 5). 1307 bytes result sent to driver
16/09/08 09:20:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 5) in 43 ms on localhost (1/1)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: ResultStage 8 (print at NetWorkWordCount.scala:24) finished in 0.028 s
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Job 4 finished: print at NetWorkWordCount.scala:24, took 0.119611 s
-------------------------------------------
Time: 1473297600000 ms
-------------------------------------------
(love,1)
(ge,1)
(i,1)

通过集群集群处理数据,我们通过日志可以得出以下结论: 
(1)这样一个流处理程序首先是一个spark应用程序,然后才是一个流处理程序 
(2)我们从日志上可以看到,启动这个程序的时候,首先通过Spark Core调度系统,启动相关的类,预分配资源。也就说明Spark Streaming 运行之前分配好了最大可以利用的资源,你可以动态资源分配,而且你可以写多个线程去同时并发提交和执行Job。 
(3)Spark Streaming依赖于SparkCore,如果DStream不产生RDD怎么运行在Spark Core 上呢?(之所以用DStream封装RDD,是spark想做多元化,一体化的大数据处理平台,多元化的处理平台,他需要将API进行一体化,通过一个API干掉一切。假设DateSet这样一个统一接口,底层引擎一致,这样一个引擎交给所有计算范式使用,对于开发成本可以大大减小)。

接下来继续观察日志:

16/09/08 09:16:18 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.MappedDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Initialized and validated [email protected]
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Initialized and validated [email protected]
16/09/08 09:16:18 INFO dstream.MappedDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.MappedDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.MappedDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.MappedDStream: Initialized and validated [email protected]
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Initialized and validated [email protected]
16/09/08 09:16:18 INFO dstream.ForEachDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.ForEachDStream: Checkpoint interval = null

通过观察日志我们可以得出以下结论: 
真实产生DStream是从后往前回溯,真正执行Job时是从前往后执行。

接下来我们需要思考DStream为什么是RDD的模板?

**DStream从后往前回溯有个名字叫做DStreamGraph,他是RDD DAG的模板。 
此时我们需要求助于日志观察RDD,但是我们没有看到RDD内容,我们需要到Web控制台上去观察RDD。RDD是有时间的,有时间RDD才存在。**

这里写图片描述

这里写图片描述

不同的时间,说明时空不同。

object DStream {

  // `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
  // `import StreamingContext._` to enable it. Now we move it here to make the compiler find
  // it automatically. However, we still keep the old function in StreamingContext for backward
  // compatibility and forward to the following function directly.

  implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): 
    //此处展示DStream的隐式转换,生成PairDStreamFunctions
    PairDStreamFunctions[K, V] = {
    new PairDStreamFunctions[K, V](stream)
  }

我们现在回到问题DSteam是RDD的模板:

1、 刚才我们是现象说明,接下来我们用数据说明。 
2、现在为止我们看到了2个现象。(1)DStream从后往前依赖的Graph(2)、具体作业运行形成RDD DAG图且DAG图看上去像DStream的复印品,接下来我们有必要干一件事就是观察SocketTextStream,InputDStream怎样产生RDD,由于SocketInputDStream没有RDD的影子,所以我们到其父类观察,父类中有computer,有validTime,有时间返回的才是RDD,而且还是在特定时间下产生RDD。 
这里写图片描述

我们接下来看SocketInputStream的父类ReceiverInputDStream的computer的源码:

 /**
   * Generates RDDs with blocks received by the receiver of this stream. */

// see 返回的是RDD,而且还是在特定时间下产生RDD。

  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
      // see 此时为什么产生RDD,因为他有可能Driver级别失败重启,你的其他配置初始化还没有完成,但是定时器已经运行,运行时光不可逆转,到时间就要办事情,所以生成的是空RDD
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        //此处代码属于Driver 此处blockinfos来自于哪里?Driver中有个receiverTracker很正常,数据存储在Cluster集群中,Driver中有你的元数据,然后按照时间validTime,去BatchDuration来说明在这个BatchDuration收到了哪些数据,这些数据的元数据在Driver中被记录,我们现在的这些代码运行就在Driver中,所以我们根据元数据去确定我们实际有哪些数据,我们有数据指针,根据指针指向具体的数据,之所以我们拿指针,原因很简单,我们在调度。调度层面就是一个抽象层面,不是具体,具体层面是梦想照进现实的层面,Driver就是在做梦中,做梦要很清楚的知道如何映射现实,没有发生事情就是发生了事情,因为Driver控制了一切。
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

为了更好地说明DStream的形成,接下来我们看一下同样是输入数据的FileInputDStream的形成。

private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
    ssc_ : StreamingContext,
// see 首先找一个目录,,在眼下根据过去10秒,是否有新的文件进来
    directory: String,
    filter: Path => Boolean = FileInputDStream.defaultFilter,
    newFilesOnly: Boolean = true,
    conf: Option[Configuration] = None)
    (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
  extends InputDStream[(K, V)](ssc_) {

  private val serializableConfOpt = conf.map(new SerializableConfiguration(_))

接下来我们看一下,FileInputDStream的computer方法:

override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    // Find new files 在眼下产生多个新路径
    val newFiles = findNewFiles(validTime.milliseconds)
    logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
    batchTimeToSelectedFiles += ((validTime, newFiles))
    recentlySelectedFiles ++= newFiles
    val rdds = Some(filesToRDD(newFiles))
    // Copy newFiles to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "files" -> newFiles.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    val inputInfo = StreamInputInfo(id, 0, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    // 最后返回RDD,所以是input级别的
    rdds
  }

我们看一下FileInputDStream的Files到RDD的方法:

/** Generate one RDD from an array of files */
  private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
    val fileRDDs = files.map { file =>
      val rdd = serializableConfOpt.map(_.value) match {
        case Some(config) => context.sparkContext.newAPIHadoopFile(
          file,
          fm.runtimeClass.asInstanceOf[Class[F]],
          km.runtimeClass.asInstanceOf[Class[K]],
          vm.runtimeClass.asInstanceOf[Class[V]],
          config)
        case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
      }
      if (rdd.partitions.size == 0) {
        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
          "files that have been \"moved\" to the directory assigned to the file stream. " +
          "Refer to the streaming programming guide for more details.")
      }
      rdd
    }
    // see 从InputDStream层面讲,刚才只需要一个RDD,此时很多RDD,Union合并成为一个RDD
    new UnionRDD(context.sparkContext, fileRDDs)
  }

接下来我们看一下DStream级别的操作:

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration
//返回的是RDD
  override def compute(validTime: Time): Option[RDD[U]] = {
  //当然第一个InputDStream一定产生RDD,后面的RDD去依赖他,到父DStream中找RDD然后执行map,对上个DStream的操作实质上就是对产生的RDD的操作
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

这也进一步说明了,DStream就是SparkCore层面上更高级的抽象,对DStream的操作是虚的,对RDD的操作才是实的。

接下来,我们看一下特殊的action级别的RDD操作:

/**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
  //特定时间RDD操作
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
      //take是RDD的action操作,这个操作在运行到这里不会执行,这里只是定义了一个函数没有调用,sparkStream只是说要干,没有实际干,定义和执行分离开啦
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println("Time: " + time)
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

接下来我们看一下foreachRDD生成的ForEachDStream

class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

// ForEachDStream中没有computer,他如果再有RDD还搞什么output操作,直接调用generatorJob,如何调用generatorJob,你得找定会器

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

接下来我们观察一下generateJobs的形成格式,首先genertor的形成需要timer:

 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
 def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
   //  ForEachDStream是outputStream
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }
 /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

      //foreachDStream注册给了graph
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

下面我们可以清楚地看到generateJob的全过程:

 /**
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }

下面我们看一下ForEachDStream的getOrCompute方法

  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD

    // 每一个DStream都有一个这样的成员generatedRDDs,有的话直接返回,没有的话就继续计算

    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.

      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

时间RDD,2分钟一个RDD,所有DStream每2分钟都有一个RDD。包括ForEachDStream内部也是RDD,因为DStream有这个成员结构,是这样。

 private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
  •  

到此为止我们可以清楚地考到DStream产生RDD的实例过程,DStream作为RDD模板的原理机制((1)DStream从后往前依赖的Graph(2)、具体作业运行形成RDD DAG图且DAG图看上去像DStream的)以及常见DStream生产RDD源码解密。

版权声明:本文为qq_35394891原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_35394891/article/details/82153091

智能推荐

JAVA环境变量配置

位置 计算机->属性->高级系统设置->环境变量 方式一 用户变量新建path 系统变量新建classpath 方式二 系统变量 新建JAVA_HOME,值为JDK路径 编辑path,前加 方式三 用户变量新建JAVA_HOME 此路径含lib、bin、jre等文件夹。后运行tomcat,eclipse等需此变量,故最好设。 用户变量编辑Path,前加 系统可在任何路径识别jav...

常用的伪类选择器

CSS选择器众多 CSS选择器及权重计算 最常用的莫过于类选择器,其它的相对用的就不会那么多了,当然属性选择器和为类选择器用的也会比较多,这里我们就常用的伪类选择器来讲一讲。 什么是伪类选择器? CSS伪类是用来添加一些选择器的特殊效果。 常用的为类选择器 状态伪类 我们中最常见的为类选择器就是a标签(链接)上的为类选择器。 当我们使用它们的时候,需要遵循一定的顺序问题,否则将可能出现bug 注意...

ButterKnife的使用介绍及原理探究(六)

前面分析了ButterKnife的源码,了解其实现原理,那么就将原理运用于实践吧。 github地址:       点击打开链接 一、自定义注解 这里为了便于理解,只提供BindView注解。 二、添加注解处理器 添加ViewInjectProcessor注解处理器,看代码, 这里分别实现了init、getSupportedAnnotationTypes、g...

1.写一个程序,提示输入两个字符串,然后进行比较,输出较小的字符串。考试复习题库1|要求:只能使用单字符比较操作。

1.写一个程序,提示输入两个字符串,然后进行比较,输出较小的字符串。 要求只能使用单字符比较操作。 参考代码: 实验结果截图:...

小demo:slideDown()实现二级菜单栏下拉效果

效果如下,鼠标经过显示隐藏的二级菜单栏 但是这样的时候会存在一个问题,就是鼠标快速不停移入移出会导致二级菜单栏闪屏现象,一般需要使用stop()来清除事件  ...

猜你喜欢

基于docker环境的mysql主从复制

1、安装docker 可以参考之前的博客,之前写过了~ 2、拉取mysql镜像 3、创建mysql01和mysql02实例 主: 从: 4、进入容器修改配置 1)修改主数据库配置 进入主数据库容器 切换到 etc/mysql/目录下 查看可以看到my.cnf文件,使用vim编辑器打开,但是需要提前安装 安装vim命令: 安装成功后,修改my.cnf文件 新增配置后的my.cnf: binlog 日...

机器学习算法之决策树

原文:http://www.jianshu.com/p/6eecdeee5012 决策树是一种简单高效并且具有强解释性的模型,广泛应用于数据分析领域。其本质是一颗由多个判断节点组成的树,如: 决策树 在使用模型进行预测时,根据输入参数依次在各个判断节点进行判断游走,最后到叶子节点即为预测结果。 如何构造决策树 决策树算法的核心是通过对数据的学习,选定判断节点,构造一颗合适的决策树。 假设我们从用户...

Netty实现一个简单的RPC框架

微服务 微服务通讯 API构建需要考虑的因素 通讯协议 文本协议或者二进制协议 支持的调用方式:单向、双向、Streaming API定义与声明 API容错、可伸缩性 RPC框架 REST http://www.ics.uci.edu/~fielding/pubs/dissertation/rest_arch_style.htm REST即Representational State Transf...

04-键值对操作(pair RDD)

前言 键值对(pair RDD)是Spark的一部分,与普通RDD具有相同的特性,却又拥有不同于普通RDD的一些特性和操作。 简单来pair RDD就是以key-value形式的RDD。 1 创建pair RDD 根据文本内容,以第一个单词作为键为例: map()函数需要设置key-value参数,如该例中:key=x.split(" ")[0], value=x。 2 pai...

高二&高一&初三模拟赛17 总结

蛋糕 题目描述 输入格式 一行两个整数 n 和 P, 意义如题面所示。 输出格式 一行一个整数, 表示有多少种切法。 输入样例 【样例一输入】 6 1000000007 【样例二输入】 20 572541752 输出样例 【样例一输出】 14 【样例二输出】 266161148 题解(卡特兰数+线性筛+快速幂) 题目看上去像个dp,设f[n]为n边形的答案,选一个三角形分割,将问题划分成两边,则f...