一、概述
Hudi(Hadoop Upserts Deletes and Incrementals)
,简称Hudi
,是一个流式数据湖渠道,支撑对海量数据快速更新,内置表格式,支撑业务的存储层、 一系列表服务、数据服务(开箱即用的吸取东西)以及完善的运维监控东西,它能够以极低的延迟将数据快速存储到HDFS或云存储(S3)的东西,最主要的特色支撑记载级别的刺进更新(Upsert)和删除,同时还支撑增量查询。
GitHub地址:github.com/apache/hudi
官方文档:hudi.apache.org/cn/docs/ove…
关于Apache Hudi 数据湖 也能够参阅我这篇文章:大数据Hadoop之——新一代流式数据湖渠道 Apache Hudi
二、Hudi CLI
构建hudi后,能够经过cd hudi cli&&
./hudi-cli.sh
发动shell。一个hudi表驻留在DFS上的一个称为basePath的方位,咱们需求这个方位才能连接到hudi表。Hudi库有效地在内部办理此表,运用.hoodie子文件夹跟踪所有元数据。
编译生成的包如下:
# 发动
./hudi-cli/hudi-cli.sh
三、Spark 与 Hudi 整合运用
Hudi 流式数据湖渠道,协助办理数据,借助HDFS文件系统存储数据,运用Spark操作数据。
Hadoop 装置可参阅我这篇文章:大数据Hadoop原理介绍+装置+实战操作(HDFS+YARN+MapReduce) Hadoop HA装置可参阅我这篇文章:大数据Hadoop之——Hadoop 3.3.4 HA(高可用)原理与完成(QJM) Spark 环境配置能够参阅我这篇文章:大数据Hadoop之——核算引擎Spark
1)Spark 测试
cd $SPARK_HOME
hdfs dfs -mkdir /tmp/
hdfs dfs -put README.md /tmp/
hdfs dfs -text /tmp/README.md
# 发动spark-shell
./bin/spark-shell --master local[2]
val datasRDD = sc.textFile("/tmp/README.md")
# 行数
datasRDD.count()
# 读取榜首行数据
datasRDD.first()
val dataframe = spark.read.textFile("/tmp/README.md")
dataframe.printSchema
dataframe.show(10,false)
2)Spark 与 Hudi 整合运用
官方示例:hudi.apache.org/docs/quick-… 在spark-shell指令行,对Hudi表数据进行操作,需求运行spark-shell指令是,增加相关的依靠包,指令如下:
1、发动spark-shell
【榜首种方法】在线联网下载相关jar包
### 发动spark-shell,运用spark-shell操作hudi数据湖
### 榜首种方法
./bin/spark-shell \
--master local[2] \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
### 上述指令需求联网,根据ivy下载下载相关jar包到本地,然后加载到CLASSPATH,其中包含三个jar包。
【第二种方法】离线运用现已下载好的jar包
### 第二种方法,运用--jars
cd /opt/apache
wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.0/spark-avro_2.12-3.3.0.jar
cd $SPARK_HOME
./bin/spark-shell \
--master local[2] \
--jars /opt/apache/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar,/opt/apache/hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/unused-1.0.0.jar,/opt/apache/spark-avro_2.12-3.3.0.jar \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
2、导入park及Hudi相关包
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
3、界说变量
val tableName = "hudi_trips_cow"
# 存储到HDFS
val basePath = "hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow"
# 存储到本地
# val basePath = "file:///tmp/hudi_trips_cow"
4、模仿生成Trip搭车数据
##构建DataGenerator目标,用于模仿生成10条Trip搭车数据
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
其中,DataGenerator能够用于生成测试数据,用来完成后续操作。
5、将模仿数据List转换为DataFrame数据集
##转成df
val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))
##检查数据结构
df.printSchema()
##检查数据
df.show()
# 指定字段查询
df.select("rider","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","ts","uuid").show(10,truncate=false)
6、将数据写入到hudi
# 将数据保存到hudi表中,由于Hudi诞生时根据Spark框架,所以SparkSQL支撑Hudi数据源,直接经过format指定数据源Source,设置相关特点保存数据即可,留意,hudi不是正真存储数据,而是办理数据。
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
## 重要参数阐明
#参数:getQuickstartWriteConfigs,设置写入/更新数据至Hudi时,Shuffle时分区数目
#参数:PRECOMBINE_FIELD_OPT_KEY,数据合并时,根据主键字段
#参数:RECORDKEY_FIELD_OPT_KEY,每条记载的唯一id,支撑多个字段
#参数:PARTITIONPATH_FIELD_OPT_KEY,用于寄存数据的分区字段
本地存储 HDFS 存储
四、Flink 与 Hudi 整合运用
官方示例:hudi.apache.org/docs/flink-…
1)发动flink集群
下载地址:flink.apache.org/downloads.h…
### 1、下载软件包
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
tar -xf flink-1.14.6-bin-scala_2.12.tgz
export FLINK_HOME=/opt/apache/flink-1.14.6
### 2、设置HADOOP_CLASSPATH
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export HADOOP_CONF_DIR='/opt/apache/hadoop/etc/hadoop'
### 3、发动单节点flink 集群
# Start the Flink standalone cluster,这儿先修正slot数量,默认是1,这儿改成4
# taskmanager.numberOfTaskSlots: 4
cd $FLINK_HOME
./bin/start-cluster.sh
# 测试可用性
./bin/flink run examples/batch/WordCount.jar
2) 发动flink SQL 客户端
# 【榜首种方法】指定jar包
./bin/sql-client.sh embedded -j ../hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle-0.12.0.jar shell
# 【第二种方法】还能够将jar包放在$FINK_HOME/lib目录下
./bin/sql-client.sh embedded shell
3)增加数据
-- sets up the result mode to tableau to show the results directly in the CLI
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);
INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
HDFS上检查
4)查询数据(批式查询)
select * from t1;
5)更新数据
-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
6)Streaming Query(流式查询)
首先创建表t2,设置相关特点,以流的方法查询读取,映射到上面表:t1
-
read.streaming.enabled
设置为true,标明经过streaming的方法读取表数据; -
read.streaming.check-interval
指定了source监控新的commits的间隔时间4s -
table.type
设置表类型为 MERGE_ON_READ
CREATE TABLE t2(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
-- Then query the table in stream mode
select * from t2;
留意:检查可能会遇到如下错误:
[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
【处理】增加hadoop-mapreduce-client-core-xxx.jar
和hive-exec-xxx.jar
到Flink lib中。
cp /opt/apache/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.2.jar $FLINK_HOME/lib
cp ./hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2.3.1-core.jar $FLINK_HOME/lib
Hive 与 Hudi的整合,小伙伴能够先看官网文档:hudi.apache.org/docs/syncin…
Spark 和 Hudi整合,Flink 与 Hudi整合先到这儿了,还有很多其它大数据组件与Hudi的整合示例解说会放在后面文章解说,请小伙伴耐性等待,有任何疑问欢迎留言,会持续更新【大数据+云原生】相关的文章~