一:JobSheduler的源码解析 

1. JobScheduler是 Streaming整个调度的核心,相当于Spark Core上的DAGScheduler. 
2. Spark Streaming为啥要设置两条线程? 
setMaster指定的两条线程是指程序运行的时候至少需要两条线程。一条线程用于接收数据,需要不断的循环。而我们指定的线程数是用于作业处理的。 
3. JobSheduler的启动是在StreamContext的start方法被调用的时候启动的。

def start(): Unit = synchronized {  state match {    case INITIALIZED =>      startSite.set(DStream.getCreationSite())      StreamingContext.ACTIVATION_LOCK.synchronized {        StreamingContext.assertNoOtherContextIsActive()        try {          validate()//而这里面启动的新线程是调度方面的,因此和我们设置的线程数没有关系。          // Start the streaming scheduler in a new thread, so that thread local properties          // like call sites and job groups can be reset without affecting those of the          // current thread.          ThreadUtils.runInNewThread("streaming-start") {            sparkContext.setCallSite(startSite.get)            sparkContext.clearJobGroup()            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")            scheduler.start()          }1234567891011121314151617181912345678910111213141516171819
4.  jobScheduler会负责逻辑层面的Job,并将其物理级别的运行在Spark之上.
/** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool. */private[streaming]class JobScheduler(val ssc: StreamingContext) extends Logging {12345671234567
5.  jobScheduler的start方法源码如下:
def start(): Unit = synchronized {  if (eventLoop != null) return // scheduler has already been started  logDebug("Starting JobScheduler")  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)  }  eventLoop.start()  // attach rate controllers of input streams to receive batch completion updates  for {    inputDStream <- ssc.graph.getInputStreams    rateController <- inputDStream.rateController  } ssc.addStreamingListener(rateController)  listenerBus.start(ssc.sparkContext)  receiverTracker = new ReceiverTracker(ssc)  inputInfoTracker = new InputInfoTracker(ssc)  receiverTracker.start()  jobGenerator.start()  logInfo("Started JobScheduler")}1234567891011121314151617181920212223242512345678910111213141516171819202122232425
6.  其中processEvent的源码如下:
private def processEvent(event: JobSchedulerEvent) {  try {    event match {      case JobStarted(job, startTime) => handleJobStart(job, startTime)      case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)      case ErrorReported(m, e) => handleError(m, e)    }  } catch {    case e: Throwable =>      reportError("Error in job scheduler", e)  }}1234567891011121312345678910111213
7.  handleJobStart的源码如下:
private def handleJobStart(job: Job, startTime: Long) {  val jobSet = jobSets.get(job.time)  val isFirstJobOfJobSet = !jobSet.hasStarted  jobSet.handleJobStart(job)  if (isFirstJobOfJobSet) {    // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the    // correct "jobSet.processingStartTime".    listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))  }  job.setStartTime(startTime)  listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))  logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)}12345678910111213141234567891011121314
8.  JobScheduler初始化的时候干了那些事?

此时为啥要设置并行度呢? 

1) 如果Batch Duractions中有多个Output操作的话,提高并行度可以极大的提高性能。 
2) 不同的Batch,线程池中有很多的线程,也可以并发运行。 
将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的。

// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff// https://gist.github.com/AlainODea/1375759b8720a3f9f094private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]//可以手动设置并行度private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)// numConcurrentJobs 默认是1private val jobExecutor =  ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")//初始化JoGeneratorprivate val jobGenerator = new JobGenerator(this)val clock = jobGenerator.clock//val listenerBus = new StreamingListenerBus()// These two are created only when scheduler starts.// eventLoop not being null means the scheduler has been started and not stoppedvar receiverTracker: ReceiverTracker = null123456789101112131415161718123456789101112131415161718

print的函数源码如下: 

1. DStream中的print源码如下:

/** * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */def print(): Unit = ssc.withScope {  print(10)}1234567812345678
2.  实际调用的时候还是对RDD进行操作。
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */def print(num: Int): Unit = ssc.withScope {  def foreachFunc: (RDD[T], Time) => Unit = {    (rdd: RDD[T], time: Time) => {      val firstNum = rdd.take(num + 1)      // scalastyle:off println      println("-------------------------------------------")      println("Time: " + time)      println("-------------------------------------------")      firstNum.take(num).foreach(println)      if (firstNum.length > num) println("...")      println()      // scalastyle:on println    }  }  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)}123456789101112131415161718192021123456789101112131415161718192021
3.  foreachFunc封装了RDD的操作。
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. * @param foreachFunc foreachRDD function * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated *                           in the `foreachFunc` to be displayed in the UI. If `false`, then *                           only the scopes and callsites of `foreachRDD` will override those *                           of the RDDs on the display. */private def foreachRDD(    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean): Unit = {  new ForEachDStream(this,    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}1234567891011121314151612345678910111213141516
4.  每个BatchDuractions都会根据generateJob生成作业。
/** * An internal DStream used to represent output operations like DStream.foreachRDD. * @param parent        Parent DStream * @param foreachFunc   Function to apply on each RDD generated by the parent DStream * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated *                           by `foreachFunc` will be displayed in the UI; only the scope and *                           callsite of `DStream.foreachRDD` will be displayed. */private[streaming]class ForEachDStream[T: ClassTag] (    parent: DStream[T],    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean  ) extends DStream[Unit](parent.ssc) {  override def dependencies: List[DStream[_]] = List(parent)  override def slideDuration: Duration = parent.slideDuration  override def compute(validTime: Time): Option[RDD[Unit]] = None//每个Batch Duractions都根据generateJob生成Job  override def generateJob(time: Time): Option[Job] = {    parent.getOrCompute(time) match {      case Some(rdd) =>        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {//foreachFunc基于rdd和time封装为func了,此时的foreachFunc就被job.run//的时候调用了。//此时的RDD就是基于时间生成的RDD,这个RDD就是DStreamGraph中的最后一个DStream决定的。然后          foreachFunc(rdd, time)        }        Some(new Job(time, jobFunc))      case None => None    }  }}1234567891011121314151617181920212223242526272829303132333435363712345678910111213141516171819202122232425262728293031323334353637
5.  此时的foreachFunc是从哪里来的?
private[streaming]//参数传递过来的,这个时候就要去找forEachDStream在哪里被调用。 class ForEachDStream[T: ClassTag] (    parent: DStream[T],    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean  ) extends DStream[Unit](parent.ssc) {1234567812345678
6.   由此可以知道真正Job的生成是通过ForeachDStream通generateJob来生成的,此时是逻辑级别的,但是真正被物理级别的调用是在JobGenerator中generateJobs被调用的。
def generateJobs(time: Time): Seq[Job] = {  logDebug("Generating jobs for time " + time)  val jobs = this.synchronized {//此时的outputStream就是forEachDStream    outputStreams.flatMap { outputStream =>      val jobOption = outputStream.generateJob(time)      jobOption.foreach(_.setCallSite(outputStream.creationSite))      jobOption    }  }  logDebug("Generated " + jobs.length + " jobs for time " + time)  jobs}12345678910111213141234567891011121314
6.  由此可以知道真正Job的生成是通过ForeachDStream通过generateJob来生成的,此时是逻辑级别的,但是真正被物理级别的调用是在JobGenerator中generateJobs被调用的。
def generateJobs(time: Time): Seq[Job] = {  logDebug("Generating jobs for time " + time)  val jobs = this.synchronized {//此时的outputStream就是forEachDStream    outputStreams.flatMap { outputStream =>      val jobOption = outputStream.generateJob(time)      jobOption.foreach(_.setCallSite(outputStream.creationSite))      jobOption    }  }  logDebug("Generated " + jobs.length + " jobs for time " + time)  jobs}

备注:

1、DT大数据梦工厂微信公众号DT_Spark 

2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

本文转自http://blog.csdn.net/snail_gesture/article/details/51448104