一、大数据处理引擎Spark介绍
这是我参与「第四届青训营」笔记创作活动的的第1天
1.大数据处理技能栈
大数据的特性,数据是海量的,数据源是丰厚多样的,有消息,图片,音视频流,数据发生的非常快,需求快速处理,进步数据价值。 数据生成后,需求存储元数据信息,挑选合适的存储格局,像Parquet、ORC是两种高功能的列式存储,Hudi数据存储的中间件,优化存储的读写,也能够存储到分布式文件存储体系HDFS,分布式消息体系kafka,keyvalue分布式存储的nosql引擎数据库HBase,依据列式存储的分布式数据库Kudu,字节供给的TOS,S3目标存储。 存储的数据需求核算才干运用,大数据的核算结构,Spark批式核算,Flink流式核算,Presto等处理在线分布式查询场景的,是可交互式的OLAP引擎,核算结构凭借资源办理的编排调度东西YARN,K8S,来运转在分布式集群中处理存储的数据。 核算处理存储的数据后供给给上层运用,有BI报表,广告,引荐,金融风控等。
2.常见大数据处理链路
数据库中搜集到的数据称为数据源,存储到分布式存储体系中HDFS等,进行一系列的数据处理,会有屡次数据读取写入,能够是各种存储体系,也能够是各种数据库,讲处理的成果进行核算,再做各种运用。
3.开源大数据处理引擎
批式核算:Hadoop、Hive、Spark 流式核算:Flink OLAP:presto,ClickHouse,Impala,DORIS。MapReduce处理了Hadoop诞生,数据大规模处理数据,主流Spark是依据内存处理,对MapReduce进行了优化。
4.什么是Spark?
Spark是用于大规模数据处理的一致剖析引擎,是一种多语言引擎,能够用于单机节点或集群上来履行数据工程,数据科学和机器学习。
feature:
1. 多语言挑选,用一致的办法处理流批的数据
2. 能够用为仪表盘履行快速的sql查询剖析,
3. 适用于大规模的数据科学,对PB等级的数据来履行探索性的数据剖析,对数据进行练习建模猜测。
4. 机器学习,在单机上练习机器学习的算法,能够很方便的拓宽到大规模集群上
5.Spark版别演进
Spark生态&特色 -SparkCore 中心块 基本功能:使命调度,内存办理 -SparkSQL,操作结构化数据的中心组件,直接查询Hbase等各式各样的数据源,能够进行交互式的查询 -StruturedStreaming 流式核算结构,支撑高吞吐的,可容错处理的,实时流式数据 -MLlib,机器学习算法库,分类,聚类,回归等,模型评估, -GraphX分布式图处理结构,供给图核算,图发掘的一些API
一致引擎,支撑各式各样分布式场景
多语言支撑
支撑丰厚的数据源:内置DataSource,Text;Parquet/ORC;JSON/CSV;JSBC 自定义DataSource:Hbase/Mongo等
丰厚灵活的API/算子:RDD
支撑K8S/YARN/Meso资源调度
6.Spark运转架构&部署办法
集群办理器,担任办理整个集群,担任资源办理和调度,监控Woker节点, Worker从节点,担任操控核算节点 Deiver Program,是一个APP整个运用的办理者,担任作业的调度,是一个JVM进程,创立一个SparkContext上下文,操控整个运用的生命周期
Spark Local Mode 本地测试/单进程多线程形式
Spark Srandalone Mode 不需求凭借外部的资源调度办理, 需求发动Spark的Sandalone集群的Master/Worker
依靠 YARN/K8S 依靠外部资源调度器 用master完结不同的部署办法,委托给谁资源办理
二、SparkCore原理解析
1.SparkCore
用户挑选了集群提交到外部资源办理器,比方说提交到YARN,在mode management创立一个appmaster,用来办理资源,是否有资源创立executor,APPmaster在YARN形式下相当于Driver也会经过DAG,Task Scheduler办理和分配Task。
Spark的数据输入到输出一切的数据结构都是依据RDD的,接下来从RDD开端说。
2.什么是RDD
RDD是一个能够容错的,并行履行的分布式数据集,最基本的数据处理模型。
- 分区列表,每一个RDD都有多个分区,这些分区运转在集群不同节点上,每个分区都会被一个核算使命Task处理,分区决定了并行核算的数量。创立RDD能够指定分区个数,从集群创立的话,默许分区数是CPU核数;从HDFS文件存储创立的话,Partition个数便是文件的blog数。
- 都有一个核算函数,RDD以Partition为基本单位,每个RDD完成一个compute函数,对详细的RDD进行核算
- 有依靠,每一个RDD都会依靠于RDD,每次转化都会生成新的RDD,RDD会形成像Pipeline形成前后依靠关系。部分分区数据丢掉时,Spark能够经过依靠关系从头核算分区数据,而不是对一切的RDD都进行从头核算
- 完成了两种类型分区函数,一个依据哈希的Hash partitioner,依据规模的 range partitioner,两种分区器。关于只有key-value的RDD,才会有分区partitioner,非key-value的RDD,partitioner的值是空的。Partitioner不但决定了RDD自身的分区数量,也决定了parent RDD shuffle时的shuffle分区数量。
- 每个分区有一个优先的方位列表,他会存储每个partition的优先方位,例如HDFS的文件会存储每个partition块的数据,移动数据不如移动核算,进行使命调度的时分,尽或许将核算分配到需求处理的使命块的方位。
供给了各式各样的算子,便是成员函数,map,filter回来新的RDD;count回来新的数据类型。cache,persist(缓存)当一个RDD被屡次运用,这个RDD核算链路非常长,那么核算成果就会非常宝贵。能够中间进行缓存,保存核算成果,这也是Spark速度快的原因,能够在内存中持久化缓存数据集。
怎么创立RDD?
- 有许多内置RDD
- 自定义RDD
- 有许多内置RDD
- 自定义RDD
两类RDD算子
- Transform算子:生成一个新的RDD
比方map,filter,flatMap,groupByKey,reduceByKey…..
本来是一个Parallel的RDD,经过一个Map的操作后,变成了一个MapPartitionsRDD。
- Action算子:触发Job提交
比方collect、count、take、saveAsTextFile…..
本来是一个RDD,做了一个count,触发了Job提交,回来了Long类型。take便是取前几个元素,触发了一个Job,回来了一个Array。
RDD依靠
- 窄依靠:父RDD的partition至多对应一个子RDD的分区,但是子RDD或许有多个父RDD的分区。窄依靠分别有三种
OneToOneDependency,是1对1的,回来对应Partition的list
RangeDependency,inStart是父RDD开始的方位,outStart是子RDD开始的方位,length是range的长度,如果子RDD的partition的index在父RDD的range内,回来父RDD,回来父RDD的partition是子RDD的partition的index减去父RDD分区的range的开始,再加上子RDD分区range的开始
PruneDependency
- 宽依靠(会发生Shuffle):父RDD的每个partition都或许对应多个子RDD分区,都会运用一切父RDD的多个分区,就相当于是one to many,当RDD做groupBy或Join操作时会发生宽依靠。
ShuffleDependency,Shuffle的发生是因为有宽依靠,宽依靠对应一个Shuffle的操作,运转进程中父RDD的分区,会传入不同的子RDD分区中,中间就或许触及到多个节点的数据传输。
如果子RDD毛病,有或许一部分父RDD就能够覆盖子RDD的核算,有时需求一切的父RDD进行重算,价值比较高。能够设置一个检查点checkpoint,然后触及容错文件的体系工作,HDFS的检查点,把这些数据写入到检查点上,做高可用的数据存储,后面有节点宕机,数据丢掉,能够从检查点的RDD核算,不需求从头到尾。
RDD履行流程
Job:RDD action算子触发 Stage:依据宽依靠区分 Task:Stage内履行单个partition使命 从后往前区分,遇到一个宽依靠,区分一个stage,遇到窄依靠就加入到这个stage,DAG最终一个阶段生成的partition生成的task叫Result Task,其他的satge叫ShuffleMapTask因为都有一个Shuffle操作。最终一个RDD partition的数量就决定了每个stage task的数量。
调度器
- 当一个RDD的算子创立之后,Sparkcontext就会依据RDD目标创立一个DAG有向无环图,触发job之后,将DAG供给给调度器,然后调度器依据ShuffleDependency分为不同的Stage,然后按照依靠顺序调度Stage,为每个Stage生成TaskSet集兼并分发到TaskScheduler。
- 经过集群中的资源办理器,在K8S形式下是master,在YARN形式下是result manager依据调度算法(FIFO/FAIR)对多个TaskSet进行调度,关于调度到的TaskSet,会将Task调度(locality)到相关Executor上面履行。
3.内存办理
Executor内存首要有两类:Storage、Execution
发动Spark时,会设置一个spark.executor.memory参数,JVM的内存,堆内内存。 缓存RDD数据或者播送数据,占用内存叫做存储内存Storage Memory。在处理shuffle时占用的内存叫履行内存Execution Memory,剩下的用户自定义数据结构,还有一些Spark 内部的元数据,定义为User Memory。
前两种内存能够相互借用,能够削减spill的操作,履行内存是不能够被存储内存所驱赶的,但履行内存需求内存时,能够驱赶被Storage借用的内存,直到达到一个规矩的存储内存的边界。当双方空间都不足时,都需求存储到硬盘上。
为了进一步优化内存的运用,进步Shuffle的排序功率,Spark引入了堆外内存(。能够直接操作操作体系堆外内存,削减不必要的内存开支,扫描回收等,办理难度低,差错比较小。
多使命间内存分配
在一个Executor内,一切的task是共享内存的,UnifiedMemoryManager一致办理多个并发Task的内存分配,Executor下能够运转多个task,每个task至少要获取1/2n的空间,如果不能满意,使命就会被堵塞,直到有足够的空间,使命才会被唤醒,n为当时Executor中正在并发运转的task数量。
Shuffle
map跟reduce之间数据处理从头分发的进程称之为Shuffle,创立一个ShuffleManger,在shuffleRDD compute逻辑履行的时分会从,env里面get完成详细函数。
SortShuffleManger
SortShuffleManger是Manager的一个完成办法,这个是两个数据量表大的时分常用的办法。对数据进行排序,Spill,发生多个零时的Spill磁盘文件,SpillFile,在最终,零时文件会兼并成一个磁盘文件,为这些磁盘文件加一个索引,会发生两个文件,一个数据文件,一个索引文件。在下一个stage来的时分,在索引文件中找到partition所在的数据文件的方位,再在数据文件中找到数据。
External Shuffle Service
External Shuffle Service运转在主机上,办理这台主机Executor节点发生的shuffle数据,在YARN上便是NodeManager办理,只处理来自于map节点和reduce节点的恳求,map节点会将Shuffle的文件途径告诉shuffle service,reduce端去读取数据的时分就会发送一个恳求获取stream id,再去获取数据。
三、SparkSQL原理解析
1.SparkSQL履行进程
- SQL Parse: 将SparkSQL字符串或DataFrame解析为一个笼统语法树/AST,即Unresolved Logical Plan
- Analysis:遍历整个AST,并对AST上的每个节点进行数据类型的绑定以及函数绑定,然后依据元数据信息Catalog对数据表中的字段进行解析。 使用Catalog信息将Unresolved Logical Plan解析成Analyzed Logical plan
- Logical Optimization:该模块是Catalyst的中心,首要分为RBO和CBO两种优化策略,其间RBO是依据规矩优化,CBO是依据价值优化。 使用一些规矩将Analyzed Logical plan解析成Optimized Logic plan
- Physical Planning: Logical plan是不能被spark履行的,这个进程是把Logic plan转化为多个Physical plans物理履行方案
- CostModel: 首要依据曩昔的功能计算数据,挑选最佳的物理履行方案(Selected Physical Plan)。
- Code Generation: sql逻辑生成Java字节码
影响SparkSQL功能两大技能:
- Optimizer:履行方案的优化,目标是找出最优的履行方案
- Runtime:运转时优化,目标是在既定的履行方案下尽或许快的履行结束。
2. Catalyst优化
- Rule Based Optimizer(RBO): 依据规矩优化,对语法树进行一次遍历,形式匹配能够满意特定规矩的节点,再进行相应的等价转化。
- Cost Based Optimizer(CBO): 依据价值优化,依据优化规矩对关系表达式进行转化,生成多个履行方案,然后CBO会经过依据计算信息(Statistics)和价值模型(Cost Model)核算各种或许履行方案的价值,从中选用COST最低的履行方案,作为实际运转方案。CBO依靠数据库目标的计算信息,计算信息的准确与否会影响CBO做出最优的挑选。
3.AQE
AQE关于全体的Spark SQL的履行进程做了相应的调整和优化,它最大的亮点是能够依据现已完结的方案结点真实且准确的履行计算成果来不停的反馈并从头优化剩下的履行方案。
AQE结构三种优化场景(3.0版别):
- 动态兼并shuffle分区(Dynamically coalescing shuffle partitions)
- 动态调整Join策略(Dynamically switching join strategies)
- 动态优化数据歪斜Join(Dynamically optimizing skew joins)
处理的数据是比较大的,shuffle比较影响功能,这个算子比较费时,partition的个数非常关键,很难确认partition的数目应该是多少
4.RuntimeFilter
在join中形成filter算子进行优化。完成在Catalyst中。动态获取Filter内容做相关优化,当咱们将一张大表和一张小表等值连接时,咱们能够从小表侧搜集一些计算信息,并在履行join前将其用于大表的扫描,进行分区修剪或数据过滤。
Runtime优化分两类:
- 大局优化:从提升大局资源使用率、消除数据歪斜、下降IO等视点做优化。包括AQE。
- 局部优化:进步某个task的履行功率,首要从进步CPU与内存使用率的视点进行优化。依靠Codegen技能。
5.Codegen
前面是优化履行方案,找出最优的履行方案。另一种是在运转时优化,怎么尽快的完结优化,进步大局使用率,消除数据歪斜,进步数据使用率。另一方面便是Codegen,局部优化,进步task的功率,从进步cpu与内存的使用率的视点来进行runtime优化。
- Expression等级
表达式惯例递归求值语法树。需求做许多类型匹配、虚函数调用、目标创立等额外逻辑,这些overhead远超对表达式求值自身,为了消除这些overhead,Spark Codegen直接拼成求值表达式的java代码并进行即时编译
- WholeStage等级
传统的火山模型:SQL经过解析会生成一颗查询树,查询树的每个节点为Operator,火山模型把operator看成迭代器,每个迭代器供给一个next()接口。经过自顶向下的调用 next 接口,数据则自底向上的被拉取处理,火山模型的这种处理办法也称为拉取履行模型,每个Operator 只需关怀自己的处理逻辑即可,耦合性低。
火山模型问题:数据以行为单位进行处理,不利于CPU cache 发挥作用;每处理一行需求调用屡次next() 函数,而next()为虚函数调用。会有许多类型转化和虚函数调用。虚函数调用会导致CPU分支猜测失利,然后导致严重的功能回退
Spark WholestageCodegen:为了消除这些overhead,会为物理方案生成类型确认的java代码。并进行即时编译和履行。
Codegen打破了Stage内部算子间的界限,拼出来跟原来的逻辑保持一致的裸的代码(通常是一个大循环)然后把拼成的代码编译成可履行文件。
四、业界应战与实践
1.Shuffle稳定问题
在大规模作业下,存在本地磁盘上,没有备份,有许多的恳求,数量级很大,有热门数据反复读,spill数据会带来写放大,reduce高并发读取小数据块会带来磁盘随机拜访的问题,也是低功率的问题,NodeManager的模块也会经常JC。 处理方案:
- 数据远端存储
- 在partition端shuffle的时分进行聚合,少了许多排序的操作
2.SQL履行功能问题
CPU流水线/分支猜测/乱序履行/CPU缓存友好/SIMD/… 向量法/Codegen/两者结合,例如Intel:OAP
3.参数引荐/作业确诊
Spark参数许多,参数不合理的作业,对资源使用率/Shuffle稳定性/功能有非常大影响。 自动化参数引荐/作业确诊——自动化