这篇主要根据官网对Shuffle的介绍做了梳理和分析,并参考下面资料中的部分内容加以理解,对英文官网上的每一句话应该细细体味,目前的能力还有欠缺,以后慢慢补。

1、Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions(以不同的分区分组). This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

2、Background

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key.(reduceByKey操作会生成一个新的RDD,其中将单个键的所有值组合成一个元组-该键以及针对与该键关联的所有值执行reduce函数的结果) The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.(挑战在于,并非单个键的所有值都必须位于同一分区,甚至同一台机器上,但是必须将它们放在同一位置才能计算结果)

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.(在Spark中,数据通常不会跨分区分布到特定操作所需的位置。在计算期间,单个任务将在单个分区上操作——因此,要组织单个reduceByKey reduce任务执行的所有数据,Spark需要执行all-to-all操作。它必须从所有分区中读取所有键的值,然后将各个分区的值放在一起,以计算每个键的最终结果——这称为shuffle

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:(尽管新打乱的数据的每个分区中的元素集是确定的,分区本身的排序也是确定的,但是这些元素的排序是不确定的。如果一个人希望在洗牌后得到可预测的有序数据,那么就可以使用它)

  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD

导致shuffle的操作主要是重新分区和合并的等算子:

Operations which can cause a shuffle include repartition operations like repartition and coalesce, (可能导致shuffle的操作包括重新分区操作,如重新分区和合并)‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

3、Performance Impact

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O.(shuffle是一项昂贵的操作,因为它涉及磁盘输入/输出、数据序列化和网络输入/输出) To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it.(要为shuffle组织数据,Spark生成任务集——map任务用于组织数据,而reduce任务集用于聚合数据) This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.(这个术语来自MapReduce,但是与Spark的map和reduce操作没有直接关系)

Internally, results from individual map tasks are kept in memory until they can’t fit.(在内部,来自单个map任务的结果被保存在内存中,直到它们不能匹配为止) Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.(然后,根据目标分区对这些分区进行排序并写入到单个文件中。在reduce端,任务读取相关的已排序块。)

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them.(某些shuffle操作会消耗大量堆内存,因为它们使用内存中的数据结构来组织传输之前或之后的记录) Specifically, reduceByKey and aggregateByKey create these structures on the map side, and ‘ByKey operations generate these on the reduce side(具体来说,reduceByKey和aggregateByKey在map端创建这些结构,而ByKey操作在reduce端生成这些结构。). When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.(当数据不适合内存时,Spark会将这些表输出到磁盘上,从而产生磁盘I / O的额外开销并增加垃圾回收。)

Shuffle also generates a large number of intermediate files on disk.(Shuffle还会在磁盘上生成大量的中间文件) As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected.(从Spark 1.3开始,这些文件将一直保留到不再使用相应的RDDs并进行垃圾收集) This is done so the shuffle files don’t need to be re-created if the lineage is re-computed.(这样做是为了在重新计算沿袭时不需要重新创建shuffle文件) Garbage collection may happen only after a long period of time, if the application retains references to these RDDs or if GC does not kick in frequently.(如果应用程序保留了对这些RDD的引用,或者如果GC不经常启动,则垃圾收集可能仅在很长一段时间后才会发生。) This means that long-running Spark jobs may consume a large amount of disk space.(这意味着长时间运行的Spark作业可能会占用大量磁盘空间) The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.(临时存储目录由spark.local指定。配置SparkContext的dir配置参数)

Shuffle behavior can be tuned by adjusting a variety of configuration parameters.(可以通过调整各种配置参数来调整shuffle行为) See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.

4、Shuffle简介

Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,这期间涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等,所以说Shuffle是整个应用程序运行过程中非常昂贵的一个阶段,理解Spark Shuffle原理有助于优化Spark应用程序。

5、Spark Shuffle的基本原理与特性

与MapReduce计算框架一样,Spark的Shuffle实现大致如下图所示,在DAG阶段以shuffle为界,划分stage,上游stage做map task,每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;下游stage做reduce task,每个reduce task通过网络拉取(fetch)上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。举个栗子:

  • 假如上游stage有100个map task,下游stage有1000个reduce task,那么这100个map task中每个map task都会得到1000份数据,而1000个reduce task中的每个reduce task都会拉取上游100个map task对应的那份数据,即第一个reduce task会拉取所有map task结果数据的第一份,以此类推。

shuffle的基本原理

在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作。所以整个shuffle过程是极其昂贵的,spark在shuffle的实现上也做了很多优化改进,随着版本的迭代发布,spark shuffle的实现也逐步得到改进。

6、Shuffle原理具体解释

6.1、shuffle原理

概述:Shuffle描述着数据从map task输出到reduce task输入的这段过程。在分布式情况下,reduce task需要跨节点去拉取其它节点上的map task结果。这一过程将会产生网络资源消耗和内存,磁盘IO的消耗。

6.2、mapreduce的shuffle原理

  • map task端操作

  每个map task都有一个内存缓冲区(默认是100MB),存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

   Spill过程:这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。整个缓冲区有个溢写的比例spill.percent(默认是0.8),当达到阀值时map task 可以继续往剩余的memory写,同时溢写线程锁定已用memory,先对key(序列化的字节)做排序,如果client程序设置了Combiner,那么在溢写的过程中就会进行局部聚合。

  Merge过程:每次溢写都会生成一个临时文件,在map task真正完成时会将这些文件归并成一个文件,这个过程叫做Merge。

  • reduce task端操作

  当某台TaskTracker上的所有map task执行完成,对应节点的reduce task开始启动,简单地说,此阶段就是不断地拉取(Fetcher)每个map task所在节点的最终结果,然后不断地做merge形成reduce task的输入文件。

  Copy过程:Reduce进程启动一些数据copy线程(Fetcher)通过HTTP协议拉取TaskTracker的map阶段输出文件

  Merge过程:Copy过来的数据会先放入内存缓冲区(基于JVM的heap size设置),如果内存缓冲区不足也会发生map task的spill(sort 默认,combine 可选),多个溢写文件时会发生map task的merge。

下面总结下mapreduce的关键词:

  存储相关的有:内存缓冲区默认大小溢写阀值

  主要过程:溢写(spill),排序,合并(combine)归并(Merge)CopyFetch

  相关参数:内存缓冲区默认大小JVM heap sizespill.percent

  • 关于排序方法:

  在Map阶段,k-v溢写时,采用的正是快排;而溢出文件的合并使用的则是归并;在Reduce阶段,通过shuffle从Map获取的文件进行合并的时候采用的也是归并;最后阶段则使用了堆排作最后的合并过程。

7、Shuffle类型的算子

7.1、去重:

1
2
def distinct()
def distinct(numPartitions: Int)

7.2、 聚合

1
2
3
4
5
6
7
8
9
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

7.3、排序

1
2
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

7.4、重分区

1
2
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)

7.5、集合或者表操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

8、参考资料

https://spark.apache.org/docs/latest/rdd-programming-guide.html

https://www.slideshare.net/colorant/spark-shuffle-introduction?from_action=save

https://www.cnblogs.com/arachis/p/Spark_Shuffle.html

http://sharkdtu.com/posts/spark-shuffle.html