图解Spark排序算子sortBy的核心源码

原创/朱季谦

一、事例说明

曾经刚开始学习Spark的时分,在操练排序算子sortBy的时分,曾发现一个有趣的现象是,在运用排序算子sortBy后直接打印的话,发现打印的成果是乱序的,并没有呈现完好排序。

例如,有一个包含多个(姓名,金额)结构的List数据,将这些数据依照金额降序排序时,代码及打印作用如下:

val money = ss.sparkContext.parallelize(
  List(("Alice", 9973),
    ("Bob", 6084),
    ("Charlie", 3160),
    ("David", 8588),
    ("Emma", 8241),
    ("Frank", 117),
    ("Grace", 5217),
    ("Hannah", 5811),
    ("Ivy", 4355),
    ("Jack", 2106))
)
money.sortBy(x =>x._2, false).foreach(println)
打印成果——
(Ivy,4355)
(Grace,5217)
(Jack,2106)
(Frank,117)
(Emma,8241)
(Alice,9973)
(Charlie,3160)
(Bob,6084)
(Hannah,5811)
(David,8588)

可见,这样的履行成果并没有依照金额进行降序排序。可是,如果运用collect或许从头将分区设置为1以及直接将成果进行save保存时,发现成果都是能够依照金额进行降序排序。(注意一点,依照save保存成果,尽管或许生成很多part-00000 ~part-00005的文件,但从part-00000到part-00005,内部数据其实也依照金额进行了降序排序)。

money.sortBy(x =>x._2, false).collect().foreach(println)
或许
money.repartition(1).sortBy(x =>x._2, false).foreach(println)
或许
money.sortBy(x =>x._2, false).saveAsTextFile("result")
终究成果——
(Alice,9973)
(David,8588)
(Emma,8241)
(Bob,6084)
(Hannah,5811)
(Grace,5217)
(Ivy,4355)
(Charlie,3160)
(Jack,2106)
(Frank,117)

二、sortBy源码剖析

为何独自经过sortBy后对数据打印,是乱序的,而在sortBy之后经过collect、save或许重分区为1个分区repartition(1),数据就是有序的呢?

带着这个疑问,去看一下sortBy底层源码——

def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}

能够看到,中心源码是 this.keyBy[K](f).sortByKey(ascending, numPartitions).values,我会将该源码分红this.keyBy[K](f) , sortByKey(ascending, numPartitions)以及values三部分解说——


2.1、逐节剖析sortBy源码之一:this.keyByK

this.keyBy[K](f)这行代码是根据_.sortBy(x =>x._2, false)传进来的x =>x._2从头生成一个新RDD数据,能够进入到其底层源码看一下——

def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
  val cleanedF = sc.clean(f)
  map(x => (cleanedF(x), x))
}

若履行的是_.sortBy(x =>x._2, false),那么f: T => K匿名函数就是x =>x._2,因此,keyBy函数的里边代码真面目是这样——

map(x => (sc.clean(x =>x._2), x))

sc.clean(x =>x._2)这个clean相当是对传入的函数做序列化,由于终究会将这个函数得到成果当作排序key分发到不同分区节点做排序,故而涉及到网络传输,因此做序列化后就方便在分布式核算中在不同节点之间传递和履行函数,clean终究底层完成是这行代码SparkEnv.get.closureSerializer.newInstance().serialize(func),感兴趣能够深入研究。

keyBy终究会生成一个新的RDD,至于这个结构是怎样的,经过原先的测试数据调用keyBy打印一下就一望而知——

val money = ss.sparkContext.parallelize(
  List(("Alice", 9973),
    ("Bob", 6084),
    ("Charlie", 3160),
    ("David", 8588),
    ("Emma", 8241),
    ("Frank", 117),
    ("Grace", 5217),
    ("Hannah", 5811),
    ("Ivy", 4355),
    ("Jack", 2106))
)
money.keyBy(x =>x._2).foreach(println)
打印成果——
(5217,(Grace,5217))
(5811,(Hannah,5811))
(8588,(David,8588))
(8241,(Emma,8241))
(9973,(Alice,9973))
(3160,(Charlie,3160))
(4355,(Ivy,4355))
(2106,(Jack,2106))
(117,(Frank,117))
(6084,(Bob,6084))

由此可知,原先这样(“Alice”, 9973)结构的RDD,经过keyBy源码里的map(x => (sc.clean(x =>x._2), x))代码,终究会生成这样结构的数据(x._2,x)也就是,(9973,(Alice,9973)), 就是从头将需要排序的字段金额当作了新RDD的key。

图解Spark排序算子sortBy的核心源码


2.2、逐节剖析sortBy源码之二:sortByKey

经过 this.keyBy[K](f)得到结构为(x._2,x)的RDD后,能够看到,尽管咱们前面调用money.sortBy(x =>x._2, false)来排序,但底层本质仍是调用了另一个排序算子sortByKey,它有两个参数,一个是布尔值的ascending,true表明按升序排序,false表明按降序排序,咱们这里传进来的是false。另一个参数numPartitions,表明分区数,能够经过定义的rdd.partitions.size知道所在环境分区数。

进入到sortByKey源码里,经过以下函数注释,就能够知道sortByKey函数都做了什么事情——

/**
 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
 * `collect` or `save` on the resulting RDD will return or output an ordered list of records
 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
 * order of the keys).
 *
 *按键对RDD进行排序,以便每个分区包含一个已排序的元素规模。
 在成果RDD上调用collect或save将回来或输出一个有序的记录列表
 (在save情况下,它们将依照键的次序写入文件体系中的多个part-X文件)。
 */
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

到这里,根据注解就能够知道sortByKey做了什么事情了——

第一步,每个分区按键对RDD进行shuffle洗牌后将相同Key划分到同一个分区,进行排序。

第二步,在调用collect或save后,会对各个已经排序好的各个分区进行合并,终究得到一个完好的排序成果。

这就意味着,若没有调用collect或save将各个分区成果进行汇总回来给master驱动进程话,尽管分区内的数据是排序的,但分区间就不一定是有序的。这时若直接foreach打印,由于打印是并行履行的,即便分区内有序,但并行一块打印就乱七八糟了。

能够写段代码验证一下,各个分区内是不是有序的——

money.sortBy(x => x._2, false).foreachPartition(x => {
  val partitionId = TaskContext.get.partitionId
  //val index = UUID.randomUUID()
  x.foreach(x => {
    println("分区号" + partitionId + ":   " + x)
  })
})
打印成果——
分区号2:   (Ivy,4355)
分区号2:   (Charlie,3160)
分区号2:   (Jack,2106)
分区号2:   (Frank,117)
分区号1:   (Bob,6084)
分区号1:   (Hannah,5811)
分区号1:   (Grace,5217)
分区号0:   (Alice,9973)
分区号0:   (David,8588)
分区号0:   (Emma,8241)

设置环境为3个分区,可见每个分区里的数据已经是降序排序了。

若是只要一个分区的话,该分区里的数据也会变成降序排序,这就是为何money.repartition(1).sortBy(x =>x._2, false).foreach(println)得到的数据也是排序成果。

sortBy主要流程如下,假设运转环境有3个分区,读取的数据去创建一个RDD的时分,会依照默许Hash分区器将数据分到3个分区里。

在调用sortBy后,RDD会经过 this.keyBy[K](f)从头生成一个新的RDD,例如结构如(8588, (David,8588)),接着进行shuffle操作,把RDD数据洗牌打散,将相应规模的key从头分到同一个分区里,意味着,同key值的数据就会分发到了同一个分区,例如下图的(2106, (Jack,2106)),(999, (Alice,999)),(999, (Frank,999)),(999, (Hannah,999))含同一个Key都在一起了。

shuffleRDD中,运用mapPartitions会对每个分区的数据依照key进行相应的升序或许降序排序,得到分区内有序的成果集。

图解Spark排序算子sortBy的核心源码


2.3、逐节剖析sortBy源码之三:.values

sortBy底层源码里 this.keyBy[K](f).sortByKey(ascending, numPartitions).values,在sortByKey之后,终究调用了.values。源码.values里边是def values: RDD[V] = self.map(_._2),就意味着,排序完成后,只回来x._2的数据,用于排序生成的RDD。类似排序过程中RDD是(5217,(Grace,5217))这样结构,排序后,若只回来x._2,就只回来(Grace,5217)这样结构的RDD即可。

图解Spark排序算子sortBy的核心源码
能够看到,shuffleRDD将相应规模的key从头分到同一个分区里,例如,0~100划到分区0,101~200划分到分区1,201~300划分到分区2,这样还有一个好处——当0,1,2分区内部的数据已经有序时,这时从全体依照0,1,2分区大局来看,其实就已经是大局有序了,当然,若要完成大局有序,还需要将其合并回来给驱动程序。


三、合并各个分区的排序,回来大局排序

调用collect或save就是把各个分区成果进行汇总,相当做了一个归并排序操作——

图解Spark排序算子sortBy的核心源码

以上,就是关于sortBy中心源码的解说。