大数据 Shuffle 原理与实践 | 青训营笔记

这是我参与「第四届青训营 」笔记创作活动的第5天

要点内容:

一、Shuffle概述

1、MapReduce概述

大数据 Shuffle 原理与实践

  • Map阶段,是在单机上进行的针对一小块数据的核算进程。

  • Shuffle 阶段,在map阶段的基础上,进行数据移动,将同种类型的数据归到一起,为后续的reduce阶段做准备

  • Reduce阶段,对移动后的数据进行处理,依然是在单机上处理一小份数据

Shuffle对功能非常重要体现在以下几个方面:

  • M*R次网络连接
  • 很多的数据移动
  • 数据丢掉危险
  • 或许存在很多的排序操作
  • 很多的数据序列化、反序列化操作
  • 数据压缩

在大数据场景下,数据shuffle表明了不同分区数据交换的进程,不同的shuffle战略功能差异较大。 现在在各个引擎中shuffle都是优化的要点,在spark框架中,shuffle 是支撑spark进行大规模复杂 数据处理的柱石。

二、Shuffle算子

1、Shuffle算子分类

Spark 中会产生shuffle的算子大概能够分为4类

  • repartition

    • coalesce、repartition
  • ByKey

    • groupByKey、reduceByKey、aggregateByKey、combineByKey、sortByKeysortBy
  • Join

    • cogroup、join
  • Distinct

    • distinct

运用:

val text = sc.textFile("mytextfile.txt") val counts = text .flatMap(line => line.split(" ")) .map(word => (word,1)) .reduceByKey(_+_) counts.collect

2、Spark中对shuffle的抽象 – 宽依靠、窄依靠

大数据 Shuffle 原理与实践

  • 窄依靠

父RDD的每个partition至多对应一个子RDD分区。

  • 宽依靠

父RDD的每个partition都或许对应多个子RDD分区。

3、Shuffle Dependency主要结构变量

  • A single key-value pair RDD, i.e.RDD[Product2[K, V]],

  • Partitioner(available aspartitionerproperty)–给定key产生分区,

有两个接口:

1、numberPartitions

2、getPartition

  • Serializer,

  • Optional key ordering (of Scala’sscala.math.Orderingtype),

  • OptionalAggregator,

  • mapSideCombineflag which is disabled (i.e.false) by default.

Aggregator

在进行Shuffle时是一个重要的功能优化器。

  • createCombiner:只要一个value的时分初始化的方法
  • mergeValue:兼并一个value到Aggregator中
  • mergeCombiners:兼并两个Aggregator

三、Shuffle进程

Shuffle完成的发展进程

  • Spark 0.8及曾经Hash Based Shuffle
  • Spark 0.8.1 为Hash Based Shuffle引进File Consolidation机制
  • Spark 0.9 引进ExternalAppendOnlyMap
  • Spark 1.1 引进Sort Based Shuffle,但默许仍为Hash Based Shufle
  • Spark 1.2 默许的Shuffle方式改为Sort Based Shuffle
  • Spark 1.4 引进Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-Sort Based Shuffle并入Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle退出历史舞台

Hash Shuffle -写数据

每个partition会映射到一个独立的文件

大数据 Shuffle 原理与实践

Hash Shuffle – 写数据优化

每个partition会映射到一个文件片段

大数据 Shuffle 原理与实践

Sort shuffle:写数据

每个task生成一个包括一切partiton数据的文件

大数据 Shuffle 原理与实践

Shuffle – 读数据

每个reduce task分别获取一切map task生成的属于自己的片段

大数据 Shuffle 原理与实践

Shuffle进程的触发流程

val text = sc.textFile("mytextfile.txt") val counts = text .flatMap(line => line.split("")) .map(word => (word,1)) .reduceByKey(_+_) counts.collect

Shuffle优化运用的技术: Netty Zero Copy

  • 可堆外内存,防止JVM堆内存到堆外内存的数据复制。

  • CompositeByteBuf 、Unpooled. wrappedBuffer、ByteBuf.slice ,能够兼并、包装、切分数组,防止发生内存复制

  • Netty 运用FileRegion完成文件传输,FileRegion 底层封装了FileChannel#transferTo() 方法,能够将文件缓冲区的数据直接传输到目标Channel,防止内核缓冲区和用户态缓冲区之间的数据复制

常见问题

  • 数据存储在本地磁盘,没有备份
  • IO并发:很多RPC恳求(M*R)
  • IO吞吐:随机读、写放大(3X)
  • GC频繁,影响NodeManager

Shuffle参数优化

  • spark.default.parallelism &&spark.sql.shuffle.partitions

  • spark.hadoopRDD.ignoreEmptySplits

  • spark.hadoop.mapreduce.input.fileinputformat.split.minsize

  • spark.sql.file.maxPartitionBytes

  • spark.sql.adaptive.enabled&&spark.sql.adaptive.shuffle.targetPostShufflelnputSize

  • spark.reducer.maxSizelnFlight

  • spark.reducer.maxReqsInFlight

  • spark.reducer.maxBlockslnFlightPerAddress

四、Push Shuffle

为什么需要Push Shuffle ?

  • Avg IO size太小,造成了很多的随机IO,严重影响磁盘的吞吐

  • M*R次读恳求,造成很多的网络连接,影响稳定性

Push Shuffle的完成

  • Facebook:cosco
  • Linkdin:magnet
  • Uber:Zeus
  • Alibaba:RSS
  • Tencent:FireStorm
  • Bytedance:CSS
  • Spark3.2:push based shuffle

Magnet完成原理

  • Spark driver组件,协调整体的shuffle操作
  • map使命的shuffle writer进程完成后,增加了-个额外的操作push. merge,将数据复制一份推到长途shuffle服务上
  • magnet shuffle service是一个强化版的ESS。将隶属于同一个shuffle partition的block,会在长途传输到magnet后被merge到一个文件中
  • reduce使命从magnet shuffle service接纳兼并好的shuffle数据
  • bitmap:存储已merge的mapper id,防止重复merge
  • position offset:如果本次block没有正常merge,能够康复到上一个block的位置
  • currentMapld:标识当时正在append的block,保证不同mapper 的block能顺次append

Magnet可靠性

  • 如果Map task输出的Block没有成功Push到magnet上,并且重复重试仍然失利,则reduce task直接从ESS上拉取原始block数据
  • 如果magnet上的block由于重复或者冲突等原因,没有正常完成merge的进程,则reducetask直接拉取未完成merge的block
  • 如果reduce拉取现已merge好的block失利,则会直接拉取merge前的原始block本质上,magnet中维护了两份shuffle数据的副本