摘要:本文整理自阿里云资深技能专家,阿里云Hologres负责人姜伟华,在FFA实时湖仓专场的共享。点击查看>>本篇内容首要分为四个部分:
一、实时数仓分层的技能需求
二、阿里云一站式实时数仓Hologres介绍
三、Flink x Hologres:天作之合
四、依据Flink Catalog的Streaming Warehouse实践
点击查看视频回放
一、实时数仓分层的技能需求
首要,咱们讲一讲数仓的分层技能以及分层技能的现状。
1、实时数仓分层技能现状
大数据现在越来越考究实时化,在各种场景下都需求实时,例如春晚实时直播大屏,双11 GMV实时大屏、实时个性化引荐等场景,都对数据的实时性有着十分高的要求。为了满意事务的实时性需求,大数据技能也开端逐渐发展出实时数仓。
但怎么构建实时数仓呢?
比较离线数仓,实时数仓没有清晰的办法论系统。因而在实践中,有各种各样的办法,但没有一个办法是万能。最近行业内提出了Streaming Warehouse的概念。Streaming Warehouse的实质是分层之间能够做到实时数据的活动,然后处理实时数仓分层的问题。
下面,咱们先来了解下实时数仓的干流分层计划。
2、实时数仓干流分层计划
实时数仓的干流分层计划首要有4个。
计划1:流式ETL
ETL(Extract- Transform-Load)是比较传统的数据仓库建设办法,而流式ETL便是指:实时数据经过Flink实时ETL处理之后,将成果写入到KV引擎中,供运用查询。而为了处理中心层不便利排查的问题,也需求将中心层数据同步到实时数仓中供剖析之用。最常见的做法便是数据经过Flink清洗后,写到Kafka构成ODS层。再从Kafka消费,经过加工构成DWD层。然后Flink加工成DWS层,最终经过加工构成ADS层的数据写到KV引擎并对接上层运用。由于直接运用Kafka数据进行剖析和探查很费事,所以也会同步一份Kafka数据到实时数仓,经过实时数仓进行剖析和探查。
这个计划的优势是层次清晰,分工清晰。但下风是需求有大量的同步使命、数据资源耗费很大、数据有许多冗余、处理链路较杂乱需求许多的组件。除此之外,这个计划构建的实时数仓分层,尤其是Kafka分层,复用性十分差,也没办法响应schema的动态改变。
计划2:流式ELT
而流式ELT则是将核算后置,直接将明细数据写进实时数仓(EL),不需求严厉的数仓分层,整个架构只需求一层,上层运用查询的时分进行数据的改换(T)或者分层。常见的做法便是把数据加工清洗后,写到实时数仓里,构成DWD层,所有的查询都依据DWD层的明细数据进行。
这个计划的长处在于,没有ETL,只要一层;数据修订很便利。但它的弊端有两个方面:
-
在查询功能方面,由于是明细数据查询,所以在某些场景下不能满意QPS或推迟的要求。
-
由于没有严厉的数仓分层,所以数据复用很困难,很难统筹各方面的诉求。
计划3:守时调度
已然实时流式无法完结数据的实时数仓分层,咱们能够将数据实时写入实时数仓的DWD层。DWS层、ADS层用离线的高频调度办法,完结分钟级的调度,然后借用离线数仓,进行分层构造。这个也便是业界常用的计划3。
这个计划的长处在于能够复用许多离线经历,计划成本低且老练。但计划也存在如下缺陷:
-
推迟大:每一层的推迟都跟调度相关,随着层次越多,调度推迟越大,实时数仓也变成了准实时数仓。
-
不能彻底复用离线计划:离线调度一般是小时级或天级,咱们能够运用全量核算。但在分钟级调度时,有必要做增量核算,不然无法及时调度。
计划4:实时物化视图
第4种计划便是经过实时数仓的物化视图才能完结数仓分层。常见的做法便是Flink实时加工后,将数据写到实时数仓构成DWD层,DWS层或ADS层的构造依赖于实时数仓的实时物化视图才能。
现在干流实时数仓都开端供给物化视图的才能,但实质上都是供给了一些简略的聚合类物化视图。假如物化视图的需求比较简略,能够运用实时数仓里的实时物化视图才能,将DWS层到ADS层的构建主动化,然后让物化视图的查询确保较高的QPS。但这个计划最大的缺陷在于,现在的实时物化视图技能都还不老练,才能有限,支撑的场景也比较有限。
二、阿里云一站式实时数仓Hologres介绍
接下来,先介绍一下阿里云一站式实时数仓Hologres产品。Hologres是阿里云自研的一站式实时数仓,它一同包含三种才能:
-
OLAP才能:同传统的实时数仓一样,能够支撑数据的实时写入、以及杂乱OLAP实时多维剖析快速响应,满意事务的极致数据探索才能。
-
在线服务Serving(KV):能够支撑KV查询场景,供给十分高的QPS和毫秒级的低推迟。
-
湖仓一体:能够直接查询数据湖的数据,以及能够加速阿里云离线数仓MaxCompute,助力事务更低成本完结湖仓一体。
下面为具体介绍:
-
首要,大家能够把Hologres作为一个常见的实时数仓。它的特色在于写入侧支撑百万RPS的实时写入,写入即可查,没有推迟。一同也支撑高功能的实时整行更新和部分更新。其间,整行更新是把整行替换掉,部分更新能够更新一行中的部分字段,二者都是实时更新。
-
在查询侧,一方面支撑杂乱的OLAP多维剖析,能够十分好的支撑实时大屏、实时报表等场景。近期Hologres拿到了TPC-H 30TB的功能世界第一的TPC官方认证成果,见>>阿里云 ODPS-Hologres改写世界纪录,领先第二名23%。其次,Hologres也支撑在线服务查询,不只支撑百万QPS KV点查,而且也支撑阿里云达摩院的Proxima向量检索引擎,能够支撑十分高效的向量检索才能。一同这些才能在Hologres中是全用SQL表达,对用户运用十分友好。此外,为了统筹数据服务和实时数仓的需求,Hologres在行存、列存的数据格式基础上,也支撑队伍共存,即队伍共存的表即一份行存,又有一份列存,而且系统确保这两份数据是强共同的,关于OLAP剖析,优化器会主动挑选列存,关于线上服务,会主动挑选行存,经过队伍共存能够十分友好的完结一份数据支撑多个运用场景。
-
由于Hologres一同支撑OLAP剖析和线上服务,其间线上服务要求十分高的稳定性和SLA。为了确保OLAP剖析和线上服务时不会发生冲突,咱们支撑了读写分离,然后完结OLAP与数据服务的强阻隔。
-
最终,在湖仓数据交互式剖析方面,Hologres对阿里云MaxCompute离线数仓里的数据,数据湖中的数据都能够秒级交互式剖析,且不需求做任何的数据搬家。
-
除此之外,Hologres的定位是一站式的企业级实时数仓,所以除了上述才能,咱们还有许多其他才能。包含数据的管理、成本管理、数据血缘、数据脱敏、数据加密、IP白名单、数据的备份和恢复等等。
三、Flink x Hologres:天作之合
1、Hologres与Flink深度集成
Flink关于实时数仓能够供给十分丰富的数据处理、数据入湖仓的才能。Hologres与Flink有些十分深度的整合才能,具体包含:
-
Hologres能够作为Flink的维表:在实时核算的场景下,Flink对维表的需求很强,Hologres支撑百万级至千万级RPS的KV点查才能,能够直接作为Flink维表运用,且能够做到实时更新,关于像实时特征存储等维表相关场景就也能够十分高效的支撑。
-
Hologres能够作为Flink的成果表:Hologres支撑高功能的实时写入和整行实时更新的才能,能够结合Flink,输出需求强壮的Update才能,满意数仓场景下的实时更新、掩盖等需求。与此一同,Hologres还有很强的部分更新才能。部分更新才能在许多场景下,能够替代Flink的多流Join,为客户节约成本。
-
Hologres能够作为Flink的源表:Hologres支撑Binlog才能,一张表的任何改变,比方insert、update、delete等等,都会发生Binlog事情。Flink能够订阅Hologres Binlog,进行驱动核算。由于Flink支撑Hologres的整表读取,二者结合构成了Flink全增量一体化的读取才能。而且,Hologres也对了接Flink CDC,它能够驱动Flink CDC的核算。
-
支撑Hologres Catalog:经过Hologres Catalog的任何操作,都会直接实时反映到Hologres里,用户也不需求在Flink建Hologres表,这样就使得Flink+Hologres就具备了整库同步、Schema Evolution的才能。
2、依据Flink+Hologres的Streaming Warehouse计划
那Flink和Hologres怎么构建Streaming Warehouse?
Streaming Warehouse:数据能在数仓之间实时的活动,实质上便是处理实时数仓分层的问题
最开端咱们介绍了常见的数仓分层计划,Flink+Hologres的Streaming Warehouse计划则是能够彻底将Flink+Kafka替换。具体做法如下:
- 将Flink写到Hologres里,构成ODS层。Flink订阅ODS层的Hologres Binlog进行加工,将Flink从DWD层再次写入Hologres里。
- Flink再订阅DWD层的Hologres Binlog,经过核算构成DWS层,将其再次写入Hologres里。
- 最终,由Hologres对外供给运用查询。
该计划比较Kafka有如下长处:
-
处理了传统中心层Kafka数据不易查、不易更新、不易修正的问题。Hologres的每一层都可查、可更新、可修正。
-
Hologres的每一层都能够独自对外供给服务。由于每一层的数据都是可查的,所以数据的复用会更好,真实完结数仓分层复用的方针。
-
Hologres支撑数据复用,模型共同,架构简化。经过Flink+Hologres,就能完结实时数仓分层,简化架构和降低成本。
3、Flink+Hologres中心才能:Binlog、队伍共存、资源阻隔
上面讲的Flink+Hologres的Streaming Warehouse计划,其强依赖于以下三个Hologres中心才能:
- Binlog:由于实时数仓一般没有Binlog,但Hologres供给了Binlog才能,用来驱动Flink做实时核算,正由于有了Binlog,Hologres才能作为流式核算的上游。
- 队伍共存。一张表既有行存数据,又有列存数据。这两份数据是强共同的。队伍共存的特性让中心层的每张表,不光能够给Flink运用,而且能够给其他运用(比方OLAP、或者线上服务)运用。
- 资源强阻隔。实时数仓一般是弱阻隔或软阻隔,经过资源组、资源队伍的办法完结资源阻隔。假如Flink的资源耗费很大,可能影响中心层的点查功能。但在Hologres强阻隔的才能下,Flink对Hologres Binlog的数据拉取,不会影响线上服务。
经过Binlog、队伍共存、资源强阻隔的三个特色,不只能让Flink+Hologres构成Streaming Warehouse,而且能够使中心的每层数据复用,被其他运用或线上服务运用,助力企业构建最简略最完好的实时数仓。
4、依据Flink+Hologres的多流兼并
接下来,讲一讲依据Flink+Hologres的多流兼并。
由于Hologres有特别强壮的部分更新才能,依据此咱们能够简化Flink的多流Join。比方在风控场景下,咱们需求依据用户ID构建用户的多旁边面画像,用户画像来自许多数据源,比方客户的阅读行为、成交行为、履约行为等等。把数据源的数据依照用户ID,把每个用户放到一行里,构成不同的字段,构成用户的完好画像。
传统的方法需求用Flink多流Join完结,Flink把上游的多个数据源相关到一同,Join后写到Kafka里,然后驱动下流的Flink,加工这行完好的数据。这就使得多流Join十分耗资源。
所以在Flink+Hologres的Streaming Warehouse计划中,能够运用Hologres的部分更新才能,把一张表定为界说成Hologres的行存表或队伍共存表。此时,整个计划就简化成上游每个数据源,同步数据到Hologres表的若干个字段里,若干个使命一同写入这张表,然后运用Hologres的部分更新才能,把数据汇总在一同。
假如翻开这张Hologres表的 Binlog,上游任何数据源的改变都会更新这张表,使这张表的Binlog中生成行数据的最新状况,然后驱动下流的Flink持续核算,然后完美匹配常见的风控场景。这种用法下,资源耗费、运维都得到了极大的简化。
四、依据Flink Catalog的Streaming Warehouse实践
Flink+Hologres的Streaming Warehouse计划现已十分老练,但仅有的缺陷在于,用户需求在两个系统之间切换,进程比较繁琐。为了让用户操作更简略,咱们依据Flink Catalog供给了愈加简略的运用体会。
下面咱们来看看怎么样依据Flink Catalog去构建依据Flink+Hologres的Streaming Warehouse。咱们会发现,有了Flink Catalog后,整个运用体会会很简略,并能充分发挥Flink和Hologres两个产品的强壮才能。
下图是一个典型的Flink+Hologres实时ETL链路:
-
ODS层、DWD层、ODS层的数据都存在Hologres中。
-
链路中所有的数据加工都是经过Flink SQL完结。在整个ETL链路中,用户不需求任何Hologres SQL,直接写Flink SQL即可。
-
Flink用户能够经过Flink SQL对每层中的Hologres数据进行数据探查(流形式和批形式都能够)。比方:当咱们发现DWS层的数据成果出现问题,需求查看哪层的成果有问题或逻辑有错误。此时,咱们能够复用本来的Flink SQL来进行探查、定位或者数据从头消费。
-
Hologres中的每层数据都能够对外供给查询和服务(经过Hologres SQL)。
接下来,以某个电商场景为例,演示一下依据Flink Catalog的Streaming Warehouse。如下图所示,有一个MySQL数据库作为订单库,里边有订单表orders、订单付出表orders_pay、以及产品品类表product_catalog。
- 第一步,咱们经过Flink的实时数仓,把数据实时同步到Hologres里,构成ODS层。
- 第二步,加工DWD层。将DWD层的数据写到Hologres里。在这个进程中,咱们需求把订单表和订单付出表,兼并成一张表,完结多路兼并。与此一同,咱们希望orders表相关产品品类表product_catalog。
- 第三步,驱动下流核算,构建DWS层。以用户维度和商铺维度,搜集计算数据。比方用户每天的订单金额和商铺每天的订单金额,然后构成一条完好的链路。
- 第四步,将DWS层的表引荐给系统运用。作为用户和商铺的特征,用做引荐用途。
- 第五步,DWD层的表能够直接用来做实时计算剖析、计算产品、实时大屏、实时报表。
上图中的绿色链路,全部运用Flink SQL完结。橙色链路对外供给服务,由Hologres SQL完结。
接下来,讲一讲每个步骤是怎么运转的。
第一步,在Flink实时数仓,构成ODS层。首要,创立一个Hologres的Catalog。MySQL中存储订单、付出以及产品信息3张表,经过Flink Catalog功能,将MySQL整库的数据实时同步至Hologres,构成ODS。相关代码如下所示。咱们能够看到,MySQL整库同步到Hologres,经过Flink SQL来表达是十分简略的。
-- 创立Hologres Catalog
CREATE CATALOG holo WITH ( ‘type’ = ‘hologres’ … );
-- MySQL整库同步到Hologres
CREATE DATABASE IF NOT EXISTS holo.order_dw
AS DATABASE mysql.sw INCLUDING all tables;
第二步,DWD实时构建。数据实时写入ODS层后,Flink读取Hologres Binlog,并用多流兼并、维表相关将订单、交易、产品3个表打成一个大宽表,实时写入至Hologres的订单汇总表中,构成DWD层。
如下SQL是DWD层表的建表句子。这张方针表包含了来自orders、orders_pay、product_catalog的字段,相关了相关的用户信息、商户信息、订单信息、产品品类信息等等,构成了一张宽表。
CREATE TABLE holo.order_dw.dwd_orders (
order_id bigint not null primary key,
--字段来自order 表
order_user_id bigint,
order_shop_id bigint,
order_product_id string,
order_fee numeric(20,2),
order_create_time timestamp_ltz,
order_update_time timestamp_ltz,
order_state int,
--字段来自product_catalog表
order_product_catalog_name string,
--字段来自orders_pay表
pay_id bigint,
pay_platfrom int,
pay_create_time timestamp_ltz
) ;
下面的SQL是真实的核算逻辑,这儿包含两个INSERT句子:
- 第一个INSERT句子是从orders表实时打宽后写入。这儿用到了Hologres的维表相关才能。实时打宽后,写入方针表的部分字段。
- 第二个INSERT句子是从orders_pay表实时同步到同一张方针表,更新别的一些字段。
这两个INSERT句子最大的相关在于,它们写的是同一张表,会主动运用方针表的主键ID进行相关。每个INSERT都是做了方针表的部分更新,两者的合力成果是实时更新的方针宽表。
BEGIN STATEMENT SET;
-- 从orders表实时打宽后写入
INSERT INTO holo.order_dw.dwd_orders (
order_id,
order_user_id,
order_shop_id,
order_product_id,
order_fee,
order_create_time,
order_update_time,
order_state,
order_product_catalog_name
)
SELECT
o.*,
dim.catalog_name
FROM
holo.order_dw.orders o
LEFT JOIN holo.order_dw.product_catalog
FOR SYSTEM_TIME AS OF proctime () AS dim
ON
o.product_id = dim.product_id;
-- 从order_pays表实时写入
INSERT INTO holo.order_dw.dwd_orders (
pay_id,
order_id,
pay_platform,
pay_create_time
)
SELECT *
FROM
holo.order_dw.orders_pay;
END;
第三步,DWS层的实时聚合。在DWD的基础上,经过Flink读取Hologres DWD的Binlog数据,进行实时目标聚合核算,比方依照用户维度聚合,依照商户维度聚合等,然后实时写入Hologres,构成DWS层。
- 先是创立对应的聚合目标表,DDL句子如下
-- 用户维度聚合目标表
CREATE TABLE holo.order_dw.dws_users (
user_id bigint not null,
ds string not null,
-- 当日完结付出总金额 payed_buy_fee_sum numeric(20,2) not null,
primary key(user_id,ds) NOT ENFORCED
);
-- 商户维度聚合目标表
CREATE TABLE holo.order_dw.dws_shops (
shop_id bigint not null,
ds string not null,
-- 当日完结付出总金额
payed_buy_fee_sum numeric(20,2) not null,
primary key(shop_id,ds) NOT ENFORCED
);
- 然后将数据写入Hologres中,经过简略的三步后,Flink SQL构建了完好的Streaming Warehouse分层系统。
--数据写入Hologres BEGIN STATEMENT SET;
INSERT INTO holo.order_dw.dws_users
SELECT
order_user_id,
DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
SUM (order_fee)
FROM holo.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL
AND order_fee IS NOT NULL
GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
INSERT INTO holo.order_dw.dws_shops
SELECT
order_shop_id,
DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
SUM (order_fee)
FROM holo.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL
AND order_fee IS NOT NULL
GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
END;
第四步,构建运用,依据DWS层,对外供给服务。
数据的分层和加工完结后,事务就能够经过Hologres查询数据并运用。在这个例子里,引荐系统要求十分高的点查功能,所以要求百万级的QPS查看才能。Hologres的行存表或者队伍共存表彻底能够满意。
这个计划和传统的实时数仓最大的差别是:传统的实时数仓只要最终一层的数据,可对外供给服务。而在Hologres里,DWD等中心层数据也能够对外供给服务,进行实时报表计算。用户能够在中心层进行查询操作,对接各种实时运用、实时大屏。比方
- 直接查DWD层的数据,典型的如依据用户ID返回引荐产品(KV场景)
--场景4: 依据用户特征引荐产品
SELECT *
FROM dws_users
WHERE
user_id = ?
AND ds = '2022-11-09’;
--场景4: 依据店铺特征引荐产品
SELECT *
FROM dws_shops
WHERE
shop_id = ?
AND ds = '2022-11-09’;
- 实时报表查看订单量和退单量(OLAP)。
--场景6:依据宽表数据展现实时报表
-- 最近30天,每个品类的订单总量和退单总量
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD'),
order_product_catalog_name,
COUNT(*),
COUNT(CASE WHEN refund_id IS NOT NULL THEN 1 ELSE null END)
FROM
dwd_orders
WHERE
order_create_time > now() - '30 day' :: inteval
GROUP BY
1, 2
ORDER BY
1, 2;
第五步,问题排查:Flink数据探查。假如某个事务目标出现反常,Flink能够直接探查每层表的数据来快速定位。比方用Flink探查Hologres DWD层的orders表。Hologres支撑Flink的流形式和批形式对数据的探查。
由于流形式是Flink的默认形式,因而咱们不需求设置执行形式。它能够直接记录数据改变,然后十分便利的查看数据反常。流形式能够探查获取一段时间范围内的数据及其改变情况。
-- 流形式探查
SELECT
*
FROM holo.order_dw.dwd_orders
/*+ OPTIONS('cdcMode'='false', 'startTime'='2022-11-09 00:00:00') */ c
WHERE
user_id = 0;
与此一同,批形式探查是获取当前时间的最新数据。Hologres也支撑Flink批形式的数据探查。批形式和流形式的区别在于,流形式重视的是改变,批形式重视的是表中的最新状况。
-- 批形式探查
set 'execution.runtime-mode'='batch’;
SELECT
*
FROM
holo.order_dw.dwd_orders
WHERE
user_id = 0
AND order_create_time>'2022-11-09 00:00:00';
五、总结
Hologres跟Flink深度集成。完结完好的Streaming Warehouse计划,该计划有如下显着优势:
- 一站式:全链路都能够用SQL表明,而且只需求用到Flink和Hologres两个组件,操作十分便利。实时ETL链路、数据分层彻底能够用Flink SQL完结,Hologres供给对外供给在线服务和OLAP查询,每层数据可复用、可查,便利构建实时数仓的数据分层和复用系统。
- 高功能:这种计划能够使得使得Hologres发挥极致的实时写入、实时更新才能和多维OLAP、高并发点查才能,Flink发挥实时加工才能。
- 企业级:自带多种企业级才能,不只运维更简略,可观测性更好,安全才能更强,也供给多种高可用才能,然后企业愈加便利的构建企业级的Streaming Warehouse。