摘要:本文整理自阿里云资深技能专家,阿里云Hologres负责人姜伟华,在FFA实时湖仓专场的共享。点击查看>>本篇内容首要分为四个部分:

一、实时数仓分层的技能需求

二、阿里云一站式实时数仓Hologres介绍

三、Flink x Hologres:天作之合

四、依据Flink Catalog的Streaming Warehouse实践

点击查看视频回放

Flink X Hologres构建企业级Streaming Warehouse

一、实时数仓分层的技能需求

首要,咱们讲一讲数仓的分层技能以及分层技能的现状。

1、实时数仓分层技能现状

大数据现在越来越考究实时化,在各种场景下都需求实时,例如春晚实时直播大屏,双11 GMV实时大屏、实时个性化引荐等场景,都对数据的实时性有着十分高的要求。为了满意事务的实时性需求,大数据技能也开端逐渐发展出实时数仓。

Flink X Hologres构建企业级Streaming Warehouse

但怎么构建实时数仓呢?

比较离线数仓,实时数仓没有清晰的办法论系统。因而在实践中,有各种各样的办法,但没有一个办法是万能。最近行业内提出了Streaming Warehouse的概念。Streaming Warehouse的实质是分层之间能够做到实时数据的活动,然后处理实时数仓分层的问题。

Flink X Hologres构建企业级Streaming Warehouse

下面,咱们先来了解下实时数仓的干流分层计划。

2、实时数仓干流分层计划

实时数仓的干流分层计划首要有4个。

计划1:流式ETL

ETL(Extract- Transform-Load)是比较传统的数据仓库建设办法,而流式ETL便是指:实时数据经过Flink实时ETL处理之后,将成果写入到KV引擎中,供运用查询。而为了处理中心层不便利排查的问题,也需求将中心层数据同步到实时数仓中供剖析之用。最常见的做法便是数据经过Flink清洗后,写到Kafka构成ODS层。再从Kafka消费,经过加工构成DWD层。然后Flink加工成DWS层,最终经过加工构成ADS层的数据写到KV引擎并对接上层运用。由于直接运用Kafka数据进行剖析和探查很费事,所以也会同步一份Kafka数据到实时数仓,经过实时数仓进行剖析和探查。

这个计划的优势是层次清晰,分工清晰。但下风是需求有大量的同步使命、数据资源耗费很大、数据有许多冗余、处理链路较杂乱需求许多的组件。除此之外,这个计划构建的实时数仓分层,尤其是Kafka分层,复用性十分差,也没办法响应schema的动态改变。

Flink X Hologres构建企业级Streaming Warehouse

计划2:流式ELT

而流式ELT则是将核算后置,直接将明细数据写进实时数仓(EL),不需求严厉的数仓分层,整个架构只需求一层,上层运用查询的时分进行数据的改换(T)或者分层。常见的做法便是把数据加工清洗后,写到实时数仓里,构成DWD层,所有的查询都依据DWD层的明细数据进行。

这个计划的长处在于,没有ETL,只要一层;数据修订很便利。但它的弊端有两个方面:

  • 在查询功能方面,由于是明细数据查询,所以在某些场景下不能满意QPS或推迟的要求。

  • 由于没有严厉的数仓分层,所以数据复用很困难,很难统筹各方面的诉求。

Flink X Hologres构建企业级Streaming Warehouse

计划3:守时调度

已然实时流式无法完结数据的实时数仓分层,咱们能够将数据实时写入实时数仓的DWD层。DWS层、ADS层用离线的高频调度办法,完结分钟级的调度,然后借用离线数仓,进行分层构造。这个也便是业界常用的计划3。

这个计划的长处在于能够复用许多离线经历,计划成本低且老练。但计划也存在如下缺陷:

  • 推迟大:每一层的推迟都跟调度相关,随着层次越多,调度推迟越大,实时数仓也变成了准实时数仓。

  • 不能彻底复用离线计划:离线调度一般是小时级或天级,咱们能够运用全量核算。但在分钟级调度时,有必要做增量核算,不然无法及时调度。

Flink X Hologres构建企业级Streaming Warehouse

计划4:实时物化视图

第4种计划便是经过实时数仓的物化视图才能完结数仓分层。常见的做法便是Flink实时加工后,将数据写到实时数仓构成DWD层,DWS层或ADS层的构造依赖于实时数仓的实时物化视图才能。

现在干流实时数仓都开端供给物化视图的才能,但实质上都是供给了一些简略的聚合类物化视图。假如物化视图的需求比较简略,能够运用实时数仓里的实时物化视图才能,将DWS层到ADS层的构建主动化,然后让物化视图的查询确保较高的QPS。但这个计划最大的缺陷在于,现在的实时物化视图技能都还不老练,才能有限,支撑的场景也比较有限。

Flink X Hologres构建企业级Streaming Warehouse

二、阿里云一站式实时数仓Hologres介绍

接下来,先介绍一下阿里云一站式实时数仓Hologres产品。Hologres是阿里云自研的一站式实时数仓,它一同包含三种才能:

  • OLAP才能:同传统的实时数仓一样,能够支撑数据的实时写入、以及杂乱OLAP实时多维剖析快速响应,满意事务的极致数据探索才能。

  • 在线服务Serving(KV):能够支撑KV查询场景,供给十分高的QPS和毫秒级的低推迟。

  • 湖仓一体:能够直接查询数据湖的数据,以及能够加速阿里云离线数仓MaxCompute,助力事务更低成本完结湖仓一体。

Flink X Hologres构建企业级Streaming Warehouse

下面为具体介绍:

  • 首要,大家能够把Hologres作为一个常见的实时数仓。它的特色在于写入侧支撑百万RPS的实时写入,写入即可查,没有推迟。一同也支撑高功能的实时整行更新和部分更新。其间,整行更新是把整行替换掉,部分更新能够更新一行中的部分字段,二者都是实时更新。

  • 在查询侧,一方面支撑杂乱的OLAP多维剖析,能够十分好的支撑实时大屏、实时报表等场景。近期Hologres拿到了TPC-H 30TB的功能世界第一的TPC官方认证成果,见>>阿里云 ODPS-Hologres改写世界纪录,领先第二名23%。其次,Hologres也支撑在线服务查询,不只支撑百万QPS KV点查,而且也支撑阿里云达摩院的Proxima向量检索引擎,能够支撑十分高效的向量检索才能。一同这些才能在Hologres中是全用SQL表达,对用户运用十分友好。此外,为了统筹数据服务和实时数仓的需求,Hologres在行存、列存的数据格式基础上,也支撑队伍共存,即队伍共存的表即一份行存,又有一份列存,而且系统确保这两份数据是强共同的,关于OLAP剖析,优化器会主动挑选列存,关于线上服务,会主动挑选行存,经过队伍共存能够十分友好的完结一份数据支撑多个运用场景。

  • 由于Hologres一同支撑OLAP剖析和线上服务,其间线上服务要求十分高的稳定性和SLA。为了确保OLAP剖析和线上服务时不会发生冲突,咱们支撑了读写分离,然后完结OLAP与数据服务的强阻隔。

  • 最终,在湖仓数据交互式剖析方面,Hologres对阿里云MaxCompute离线数仓里的数据,数据湖中的数据都能够秒级交互式剖析,且不需求做任何的数据搬家。

  • 除此之外,Hologres的定位是一站式的企业级实时数仓,所以除了上述才能,咱们还有许多其他才能。包含数据的管理、成本管理、数据血缘、数据脱敏、数据加密、IP白名单、数据的备份和恢复等等。

Flink X Hologres构建企业级Streaming Warehouse

三、Flink x Hologres:天作之合

1、Hologres与Flink深度集成

Flink关于实时数仓能够供给十分丰富的数据处理、数据入湖仓的才能。Hologres与Flink有些十分深度的整合才能,具体包含:

  • Hologres能够作为Flink的维表:在实时核算的场景下,Flink对维表的需求很强,Hologres支撑百万级至千万级RPS的KV点查才能,能够直接作为Flink维表运用,且能够做到实时更新,关于像实时特征存储等维表相关场景就也能够十分高效的支撑。

  • Hologres能够作为Flink的成果表:Hologres支撑高功能的实时写入和整行实时更新的才能,能够结合Flink,输出需求强壮的Update才能,满意数仓场景下的实时更新、掩盖等需求。与此一同,Hologres还有很强的部分更新才能。部分更新才能在许多场景下,能够替代Flink的多流Join,为客户节约成本。

  • Hologres能够作为Flink的源表:Hologres支撑Binlog才能,一张表的任何改变,比方insert、update、delete等等,都会发生Binlog事情。Flink能够订阅Hologres Binlog,进行驱动核算。由于Flink支撑Hologres的整表读取,二者结合构成了Flink全增量一体化的读取才能。而且,Hologres也对了接Flink CDC,它能够驱动Flink CDC的核算。

  • 支撑Hologres Catalog:经过Hologres Catalog的任何操作,都会直接实时反映到Hologres里,用户也不需求在Flink建Hologres表,这样就使得Flink+Hologres就具备了整库同步、Schema Evolution的才能。

Flink X Hologres构建企业级Streaming Warehouse

2、依据Flink+Hologres的Streaming Warehouse计划

那Flink和Hologres怎么构建Streaming Warehouse?

Streaming Warehouse:数据能在数仓之间实时的活动,实质上便是处理实时数仓分层的问题

最开端咱们介绍了常见的数仓分层计划,Flink+Hologres的Streaming Warehouse计划则是能够彻底将Flink+Kafka替换。具体做法如下:

  1. 将Flink写到Hologres里,构成ODS层。Flink订阅ODS层的Hologres Binlog进行加工,将Flink从DWD层再次写入Hologres里。
  2. Flink再订阅DWD层的Hologres Binlog,经过核算构成DWS层,将其再次写入Hologres里。
  3. 最终,由Hologres对外供给运用查询。

该计划比较Kafka有如下长处:

  • 处理了传统中心层Kafka数据不易查、不易更新、不易修正的问题。Hologres的每一层都可查、可更新、可修正。

  • Hologres的每一层都能够独自对外供给服务。由于每一层的数据都是可查的,所以数据的复用会更好,真实完结数仓分层复用的方针。

  • Hologres支撑数据复用,模型共同,架构简化。经过Flink+Hologres,就能完结实时数仓分层,简化架构和降低成本。

Flink X Hologres构建企业级Streaming Warehouse

3、Flink+Hologres中心才能:Binlog、队伍共存、资源阻隔

上面讲的Flink+Hologres的Streaming Warehouse计划,其强依赖于以下三个Hologres中心才能:

  1. Binlog:由于实时数仓一般没有Binlog,但Hologres供给了Binlog才能,用来驱动Flink做实时核算,正由于有了Binlog,Hologres才能作为流式核算的上游。
  2. 队伍共存。一张表既有行存数据,又有列存数据。这两份数据是强共同的。队伍共存的特性让中心层的每张表,不光能够给Flink运用,而且能够给其他运用(比方OLAP、或者线上服务)运用。
  3. 资源强阻隔。实时数仓一般是弱阻隔或软阻隔,经过资源组、资源队伍的办法完结资源阻隔。假如Flink的资源耗费很大,可能影响中心层的点查功能。但在Hologres强阻隔的才能下,Flink对Hologres Binlog的数据拉取,不会影响线上服务。

经过Binlog、队伍共存、资源强阻隔的三个特色,不只能让Flink+Hologres构成Streaming Warehouse,而且能够使中心的每层数据复用,被其他运用或线上服务运用,助力企业构建最简略最完好的实时数仓。

Flink X Hologres构建企业级Streaming Warehouse

4、依据Flink+Hologres的多流兼并

接下来,讲一讲依据Flink+Hologres的多流兼并。

由于Hologres有特别强壮的部分更新才能,依据此咱们能够简化Flink的多流Join。比方在风控场景下,咱们需求依据用户ID构建用户的多旁边面画像,用户画像来自许多数据源,比方客户的阅读行为、成交行为、履约行为等等。把数据源的数据依照用户ID,把每个用户放到一行里,构成不同的字段,构成用户的完好画像。

传统的方法需求用Flink多流Join完结,Flink把上游的多个数据源相关到一同,Join后写到Kafka里,然后驱动下流的Flink,加工这行完好的数据。这就使得多流Join十分耗资源。

所以在Flink+Hologres的Streaming Warehouse计划中,能够运用Hologres的部分更新才能,把一张表定为界说成Hologres的行存表或队伍共存表。此时,整个计划就简化成上游每个数据源,同步数据到Hologres表的若干个字段里,若干个使命一同写入这张表,然后运用Hologres的部分更新才能,把数据汇总在一同。

假如翻开这张Hologres表的 Binlog,上游任何数据源的改变都会更新这张表,使这张表的Binlog中生成行数据的最新状况,然后驱动下流的Flink持续核算,然后完美匹配常见的风控场景。这种用法下,资源耗费、运维都得到了极大的简化。

Flink X Hologres构建企业级Streaming Warehouse

四、依据Flink Catalog的Streaming Warehouse实践

Flink+Hologres的Streaming Warehouse计划现已十分老练,但仅有的缺陷在于,用户需求在两个系统之间切换,进程比较繁琐。为了让用户操作更简略,咱们依据Flink Catalog供给了愈加简略的运用体会。

下面咱们来看看怎么样依据Flink Catalog去构建依据Flink+Hologres的Streaming Warehouse。咱们会发现,有了Flink Catalog后,整个运用体会会很简略,并能充分发挥Flink和Hologres两个产品的强壮才能。

下图是一个典型的Flink+Hologres实时ETL链路:

  1. ODS层、DWD层、ODS层的数据都存在Hologres中。

  2. 链路中所有的数据加工都是经过Flink SQL完结。在整个ETL链路中,用户不需求任何Hologres SQL,直接写Flink SQL即可。

  3. Flink用户能够经过Flink SQL对每层中的Hologres数据进行数据探查(流形式和批形式都能够)。比方:当咱们发现DWS层的数据成果出现问题,需求查看哪层的成果有问题或逻辑有错误。此时,咱们能够复用本来的Flink SQL来进行探查、定位或者数据从头消费。

  4. Hologres中的每层数据都能够对外供给查询和服务(经过Hologres SQL)。

Flink X Hologres构建企业级Streaming Warehouse

接下来,以某个电商场景为例,演示一下依据Flink Catalog的Streaming Warehouse。如下图所示,有一个MySQL数据库作为订单库,里边有订单表orders、订单付出表orders_pay、以及产品品类表product_catalog。

Flink X Hologres构建企业级Streaming Warehouse

  1. 第一步,咱们经过Flink的实时数仓,把数据实时同步到Hologres里,构成ODS层。
  2. 第二步,加工DWD层。将DWD层的数据写到Hologres里。在这个进程中,咱们需求把订单表和订单付出表,兼并成一张表,完结多路兼并。与此一同,咱们希望orders表相关产品品类表product_catalog。
  3. 第三步,驱动下流核算,构建DWS层。以用户维度和商铺维度,搜集计算数据。比方用户每天的订单金额和商铺每天的订单金额,然后构成一条完好的链路。
  4. 第四步,将DWS层的表引荐给系统运用。作为用户和商铺的特征,用做引荐用途。
  5. 第五步,DWD层的表能够直接用来做实时计算剖析、计算产品、实时大屏、实时报表。

上图中的绿色链路,全部运用Flink SQL完结。橙色链路对外供给服务,由Hologres SQL完结。

接下来,讲一讲每个步骤是怎么运转的。

第一步,在Flink实时数仓,构成ODS层。首要,创立一个Hologres的Catalog。MySQL中存储订单、付出以及产品信息3张表,经过Flink Catalog功能,将MySQL整库的数据实时同步至Hologres,构成ODS。相关代码如下所示。咱们能够看到,MySQL整库同步到Hologres,经过Flink SQL来表达是十分简略的。

-- 创立Hologres Catalog
CREATE CATALOG holo WITH ( ‘type’ = ‘hologres’ … );
 -- MySQL整库同步到Hologres
CREATE DATABASE IF NOT EXISTS holo.order_dw 
AS DATABASE mysql.sw INCLUDING all tables;

第二步,DWD实时构建。数据实时写入ODS层后,Flink读取Hologres Binlog,并用多流兼并、维表相关将订单、交易、产品3个表打成一个大宽表,实时写入至Hologres的订单汇总表中,构成DWD层。

如下SQL是DWD层表的建表句子。这张方针表包含了来自orders、orders_pay、product_catalog的字段,相关了相关的用户信息、商户信息、订单信息、产品品类信息等等,构成了一张宽表。

CREATE TABLE holo.order_dw.dwd_orders (
  order_id bigint not null primary key,
  --字段来自order 表
  order_user_id bigint,
  order_shop_id bigint,
  order_product_id string,
  order_fee numeric(20,2),
  order_create_time timestamp_ltz,
  order_update_time timestamp_ltz,
  order_state int,
  --字段来自product_catalog表
  order_product_catalog_name string,
  --字段来自orders_pay表
  pay_id bigint,
  pay_platfrom int,
  pay_create_time timestamp_ltz
) ;

下面的SQL是真实的核算逻辑,这儿包含两个INSERT句子:

  • 第一个INSERT句子是从orders表实时打宽后写入。这儿用到了Hologres的维表相关才能。实时打宽后,写入方针表的部分字段。
  • 第二个INSERT句子是从orders_pay表实时同步到同一张方针表,更新别的一些字段。

这两个INSERT句子最大的相关在于,它们写的是同一张表,会主动运用方针表的主键ID进行相关。每个INSERT都是做了方针表的部分更新,两者的合力成果是实时更新的方针宽表。

BEGIN STATEMENT SET;
-- 从orders表实时打宽后写入
INSERT INTO holo.order_dw.dwd_orders (
order_id,
order_user_id,
order_shop_id,
order_product_id,
order_fee,
order_create_time,
order_update_time,
order_state,
order_product_catalog_name
)
SELECT 
o.*,
dim.catalog_name
FROM
holo.order_dw.orders o 
LEFT JOIN holo.order_dw.product_catalog 
FOR SYSTEM_TIME AS OF proctime () AS dim
ON
o.product_id = dim.product_id;
-- 从order_pays表实时写入
INSERT INTO holo.order_dw.dwd_orders (
pay_id,
order_id,
pay_platform,
pay_create_time
)
SELECT *
FROM
holo.order_dw.orders_pay;
END;

第三步,DWS层的实时聚合。在DWD的基础上,经过Flink读取Hologres DWD的Binlog数据,进行实时目标聚合核算,比方依照用户维度聚合,依照商户维度聚合等,然后实时写入Hologres,构成DWS层。

  • 先是创立对应的聚合目标表,DDL句子如下
-- 用户维度聚合目标表
CREATE TABLE holo.order_dw.dws_users (
  user_id bigint not null,
  ds string not null,
  -- 当日完结付出总金额     payed_buy_fee_sum numeric(20,2) not null,      
  primary key(user_id,ds) NOT ENFORCED
);
 -- 商户维度聚合目标表
CREATE TABLE holo.order_dw.dws_shops (
  shop_id bigint not null,
  ds string not null,
  -- 当日完结付出总金额
  payed_buy_fee_sum numeric(20,2) not null,     
  primary key(shop_id,ds) NOT ENFORCED
);
  • 然后将数据写入Hologres中,经过简略的三步后,Flink SQL构建了完好的Streaming Warehouse分层系统。
--数据写入Hologres BEGIN STATEMENT SET;
INSERT INTO holo.order_dw.dws_users
SELECT 
  order_user_id,
  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
  SUM (order_fee)
FROM holo.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL 
AND order_fee IS NOT NULL
GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
 INSERT INTO holo.order_dw.dws_shops
SELECT 
  order_shop_id,
  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
  SUM (order_fee)
FROM holo.order_dw.dwd_orders c
WHERE pay_id IS NOT NULL 
AND order_fee IS NOT NULL
GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
END;

第四步,构建运用,依据DWS层,对外供给服务。

数据的分层和加工完结后,事务就能够经过Hologres查询数据并运用。在这个例子里,引荐系统要求十分高的点查功能,所以要求百万级的QPS查看才能。Hologres的行存表或者队伍共存表彻底能够满意。

这个计划和传统的实时数仓最大的差别是:传统的实时数仓只要最终一层的数据,可对外供给服务。而在Hologres里,DWD等中心层数据也能够对外供给服务,进行实时报表计算。用户能够在中心层进行查询操作,对接各种实时运用、实时大屏。比方

  • 直接查DWD层的数据,典型的如依据用户ID返回引荐产品(KV场景)
--场景4: 依据用户特征引荐产品
SELECT * 
FROM dws_users 
WHERE 
user_id = ? 
AND ds = '2022-11-09’;
--场景4: 依据店铺特征引荐产品
SELECT * 
FROM dws_shops 
WHERE 
shop_id = ? 
AND ds = '2022-11-09’;
  • 实时报表查看订单量和退单量(OLAP)。
--场景6:依据宽表数据展现实时报表
-- 最近30天,每个品类的订单总量和退单总量
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD'),
order_product_catalog_name,
COUNT(*),
COUNT(CASE WHEN refund_id IS NOT NULL THEN 1 ELSE null END)
FROM
dwd_orders
WHERE
order_create_time > now() - '30 day' :: inteval
GROUP BY
1, 2
ORDER BY
1, 2;

第五步,问题排查:Flink数据探查。假如某个事务目标出现反常,Flink能够直接探查每层表的数据来快速定位。比方用Flink探查Hologres DWD层的orders表。Hologres支撑Flink的流形式和批形式对数据的探查。

由于流形式是Flink的默认形式,因而咱们不需求设置执行形式。它能够直接记录数据改变,然后十分便利的查看数据反常。流形式能够探查获取一段时间范围内的数据及其改变情况。

-- 流形式探查
SELECT 
  * 
FROM holo.order_dw.dwd_orders 
/*+ OPTIONS('cdcMode'='false', 'startTime'='2022-11-09 00:00:00') */ c 
WHERE 
  user_id = 0;

与此一同,批形式探查是获取当前时间的最新数据。Hologres也支撑Flink批形式的数据探查。批形式和流形式的区别在于,流形式重视的是改变,批形式重视的是表中的最新状况。

-- 批形式探查
set 'execution.runtime-mode'='batch’;
SELECT 
  * 
FROM 
  holo.order_dw.dwd_orders 
WHERE 
  user_id = 0
  AND order_create_time>'2022-11-09 00:00:00';

五、总结

Hologres跟Flink深度集成。完结完好的Streaming Warehouse计划,该计划有如下显着优势:

  1. 一站式:全链路都能够用SQL表明,而且只需求用到Flink和Hologres两个组件,操作十分便利。实时ETL链路、数据分层彻底能够用Flink SQL完结,Hologres供给对外供给在线服务和OLAP查询,每层数据可复用、可查,便利构建实时数仓的数据分层和复用系统。
  2. 高功能:这种计划能够使得使得Hologres发挥极致的实时写入、实时更新才能和多维OLAP、高并发点查才能,Flink发挥实时加工才能。
  3. 企业级:自带多种企业级才能,不只运维更简略,可观测性更好,安全才能更强,也供给多种高可用才能,然后企业愈加便利的构建企业级的Streaming Warehouse。

Flink X Hologres构建企业级Streaming Warehouse