导读: 跟着叮咚买菜事务的开展,不同的事务场景对数据剖析提出了不同的需求,他们期望引进一款实时 OLAP 数据库,构建一个灵敏的多维实时查询和剖析的渠道,一致数据的接入和查询方案,处理各事务线对数据高效实时查询和精细化运营的需求。经过调研选型,终究引进 Apache Doris 作为终究的 OLAP 剖析引擎,Doris 作为中心的 OLAP 引擎支撑杂乱地剖析操作、供给多维的数据视图,在叮咚买菜数十个事务场景中广泛运用。
作者|叮咚买菜资深数据工程师 韩青
叮咚买菜创立于 2017 年 5 月,是一家专注夸姣食物的创业公司。叮咚买菜专注吃的工作,为满意更多人“想吃什么”而努力,经过夸姣食材的供应、夸姣味道的开发以及美食品牌的孵化,不断为人们供给夸姣生活的处理方案,致力让更多人吃得新鲜、吃得省心、吃得丰厚、吃得健康……以更夸姣的舌尖体会,为现代家庭发明美味与幸福感。
事务需求
跟着叮咚买菜事务的开展,不同的事务场景对数据剖析提出了不同的需求,这些需求终究被数据看板、实时 Ad-Hoc、行为剖析、B/C 端事务渠道和标签渠道等系统运用所承载,为完结这些系统运用,叮咚大数据期望引进一款实时 OLAP 数据库,旨在供给一个灵敏的多维实时查询和剖析的渠道,一致数据的接入和查询方案,处理各事务线对数据高效实时查询和精细化运营的需求。根据上述诉求,咱们期望所引进的数据库具备以下才干:
- 可以实时高效地剖析和运用数据;
- 可以支撑明细数据、汇总数据两种不同的数据查询方法;
- 可以对入库后的数据即席挑选维度和条件查询,实时/近实时回来成果
选型和对比
咱们首要对比了 Apache Doris 和 ClickHouse 两款市面上最常见的开源 OLAP 引擎,在选型过程中咱们首要考虑以下几个方面:
- 支撑标准 SQL,无需投入额定的时刻适应和学习新的 SQL 方言、直接用标准 SQL 即可直接查询,最大化下降运用门槛;
- 支撑 Join 操作,便利事实表与维度表进行相关查询,在应对维度更新时具有更高的灵敏性、无需对处理后的宽表进行重刷;
- 支撑高并发查询,系统面对多条事务线的一起运用,因而需求有比较强的并行查询才干,以更好满意事务需求;
- 支撑离线和实时导入,可与已有技能栈轻松对接,支撑多个数据源或大数据组件的离线和实时导入,以更好适配不同运用场景;
- 支撑大数据量的明细数据查询,以满意不同事务用户灵敏多变的剖析需求;
经过翔实的技能调研,Apache Doris 各项才干都比较优异,在咱们的大多数事务场景中都需求明细数据等级的查询、高并发的点查和大数据量的 Join,而这几个方面 Apache Doris 相较于 ClickHouse 均更胜一筹,因而咱们决议运用 Apache Doris 来建立新的架构系统。
架构系统
在整体架构中,各个组件承载着特定的功能,Elasticsearch 首要负责存储标签系统的数据,HBase 是实时数仓的维表层,MySQL 用于存储事务系统的数据存储,Kafka 首要存储实时数据,Spark 首要供给 Ad-Hoc 查询的核算集群服务,而 Apache Doris 作为中心的 OLAP 引擎支撑杂乱地剖析操作、供给多维的数据视图。
- 离线部分: 数据从事务库经过 DataX 导入到数据库房 ODS 层,经过层层处理输出到 Doris 中,上层 BI 系统链接到 Doris 进行报表展示。
- 实时部分: 数据经过 Flink 消费 Kafka 中的数据,进行相应的逻辑处理后,输出到 Doris 或许 HDFS 中,供运用层运用。
在数据运用的 OLAP 层中,Doris 运用方案如下图所示:
-
模型创立规范化: 选用流程批阅的方法进行数据建模,依据详细的事务场景来建立 Duplicate,Unique Key 和 Aggregate 模型,并按照用户供给的数据量设置合适的 Bucket 数目,做好模型归属关系。
-
数据入口的一致: 数据的流入首要有实时和离线两种,实时数据用 Flink 使命从 Kafka 消费数据,逻辑处理流入 Doris ;离线数据经过 Broker Load 方法从 Hive 中将数据灌入 Doris 中。
-
服务对外出口的一致: 对外服务首要经过两种方法露出接口,一种是运用 JDBC 直连,上层系统装备 Doris 集群的 FE 的衔接信息直连 Doris;另一种是事务经过内部的 One API 服务装备事务接口运用 Doris。
-
事务 SQL 的优化治理: 经过采集 Doris FE 的审计日志,以此来对 SQL 的功能进行剖析优化,以及对 Doris 服务进行治理。
运用实践
叮咚买菜现在已经将 OLAP 引擎一致为 Apache Doris 广泛运用于事务场景中,咱们将 Doris 集群拆分红了四个集群,分别支撑中心报表、行为剖析与算法运用、B/C 端事务和实时数据,依据不同事务场景的事务量及数据规划,集群的资源装备也不尽相同。现在总的机器规划达数十台,以行为剖析场景为例,单个集群近 20 个节点、 存储数据量超越百 TB,每日新增数据量达数十亿条 。
接下来共享 Apache Doris 在叮咚买菜常见事务场景中的运用实践及运用经验。
实时数据剖析
从下方数仓模型图可知,数据经过 Flink 作业进行逻辑处理,在不同层级 Kafka 中进行流通加工,经过数据汇总层之后,运用层需求一个组件来存储成果数据,该组件一般是从 MySQL 数据库、KV 存储和 OLAP 引擎三者中挑选其一。
考虑到咱们的成果数据大多以核算目标数据居多,缺少维度数据,因而运用层的组件需求具备高效、低推迟的数据 Join 才干。根据以上要素,咱们终究挑选 Apache Doris 作为实时数仓和实时事务的数据运用层,Doris 可以有效下降数据处理延时,进步查询功率。
比方在销量方案项目中,该项目需求每日实时写入大量猜测数据,而且这些数据需求较低时延供给给剖析师进行及时对比剖析、修改该猜测值,并供给到供应链端。因修改猜测值会影响到系统调拨,所以选用的存储有必要是要具有高吞吐低推迟特性,Doris 完全符合该要求。从销量方案的整个数据出产及处理链路来看,运用 Doris 后,最慢 2 秒内可以看到最新的数据。
当时公司已经有数十个实时事务需求接入到 Doris 中,跟着事务的不断开展,这个数字也在慢慢添加。
B 端事务查询取数
在实际的运用中,一般会有 B 端事务或系统需求从数据库房中取数的需求,比方自研 Pylon 系统(首要用来根据用户偏好的数据查询)会依据 UID 查询用户画像信息。在这种场景下,一般需求进行杂乱的模型相关,一起要求在秒级或许毫秒级回来查询成果。
- 运用前:咱们开始运用 Spark 的 JDBC 方法来直接查询数据库房 Hive 表数据,因为存放用户标签数据的 Hive 表的数据量有几千万体量,经过 Spark JDBC 方法要消耗几分钟才干查出成果,一起还存在 Yarn 调度耗时较高的问题,有时会因为队列资源严重发生推迟,导致一个一般的查询乃至需求十几分钟才干跑出成果,用户的体会度十分欠好。
- 运用后:经过咱们对数据链路的改造,将 Hive 的用户标签数据离线灌入 Doris 中,再用相同的 SQL 查询,Doris 的功能在绝大多数场景下比 Spark 要好许多,可以在秒等级得到回来成果。
标签系统
开始咱们的标签数据存放在 ES 中,但是跟着事务的扩展、下流事务越来越多,标签数据规划急速膨胀,策略规矩不断添加改变,标签系统遭受严重的功能瓶颈。
- 聚合和 Join 查询的功能低
- 人群圈选花费时刻近 20 分钟
- ES 导入慢、查询功能低
为处理以上问题,咱们现在正在测验运用 Doris 来替换 ES,期望经过 Doris 处理上述问题,挑选 Doris 首要考虑以下三点:
1、散布式 Join 大大提高查询功率
原有产品 ID 和库房 ID 经过嵌套类型存储在 ES 中,替换为 Doris 存储之后,需求将杂乱的嵌套类型拆解为两张表来做表级相关,一起可以试用 Doris 的多种散布式的 Join 进步查询得功能。Doris 的散布式 Join 首要有 Shuffle Join、Broadcast Join 和 Colocate Join。
其中 Colocate Join 查询功能是最好的,旨在为某些 Join 查询供给本地性优化,来减少数据在节点间的传输耗时、加速查询,另外咱们在该场景下根本均为千万级的表。综合来看,Colocate Join 比较符合场景与需求,终究决议运用 Colocate Join方法提高 Join 功能。
如何运用: 标签数据的运用首要涉及到两张大表的 Join,建表时需求设置相同的 Distributed Key、相同的 Bucket数、相同的副本数,还要将两个表经过 colocate_with
属性划分到一个组 Colocation Group(CG)。
CREATE TABLE `profile_table` (
`pdate` date NULL COMMENT "null",
`product_mongo_id` varchar(4000) NULL COMMENT "产品ID",
`station_id` varchar(4000) NULL COMMENT "仓id",
......
) ENGINE=OLAP
UNIQUE KEY(`pdate`,
`product_mongo_id`, `station_id`)
COMMENT "OLAP"
PARTITION BY RANGE(`pdate`)()
DISTRIBUTED BY
HASH(`product_mongo_id`) BUCKETS 7
PROPERTIES ("colocate_with" = "profile_table","in_memory" = "false","storage_format" = "V2")
CREATE TABLE
`station_info_table` ( `product_mongo_id` varchar(4000) NULL COMMENT "产品id", `station_id` varchar(4000)NULL
COMMENT "站点id",
`snapshot` date NULL COMMENT "日期",
`product_id` bigint(20) NULL COMMENT "产品id", ......)
ENGINE=OLAPUNIQUE KEY(`product_mongo_id`, `station_id`, `snapshot`)
COMMENT "OLAP"
PARTITION BY RANGE(`snapshot`)()
DISTRIBUTED BY
HASH(`product_mongo_id`) BUCKETS 7
PROPERTIES ("colocate_with" = "profile_table","in_memory" = "false","storage_format" = "V2")
比方咱们有这样一条查询句子:
select count(psp.product_mongo_id) from profile_table psp
left join station_info_table psi on psp.product_mongo_id=psi.product_mongo_id and psp.station_id=psi.station_id
where psp.pdate='2023-03-16' and psp.four_category='特征醋' and psp.brand_name='宝鼎天鱼' and psp.weight_unit='ml' and psp.pmg_name='粮油调味组';
经过运用 Colocate Join 方法优化后,可以到达毫秒级的查询作用。接下来咱们介绍一下 Colocate Join 的查询功能高的原因有哪些呢?
A. 数据导入时确保数据本地性
Doris 的分区方法如下所示,先依据分区字段 Range 分区,再依据指定的 Distributed Key Hash 分桶。
所以咱们在数据导入时确保本地性的中心思想便是两次映射,关于 Colocate Tables,咱们确保相同 Distributed Key 的数据映射到相同的 Bucket Seq,再确保相同 Bucket Seq 的 Buckets 映射到相同的 BE。可以同查看执行方案查看是否运用了Colocate Join:
关于 HashJoinFragment,因为 Join 的多张表有了数据本地性确保,所以可以去掉 Exchange Node,防止网络传输,将 ScanNode 直接设置为 Hash Join Node 的 Child。
B. 查询调度时确保数据本地性
-
查询调度的目标: 一个 Colocate Join 中所有 ScanNode 中所有 Bucket Seq 相同的 Buckets 被调度到同一个 BE。
-
查询调度的策略:第一个 ScanNode 的 Buckets 随机挑选 BE,其他的 ScanNode 和第一个 ScanNode 保持一致。
C. 数据 Balance 后确保数据本地性
新增一个 Daemon 线程专门处理 Colocate Table 的 Balance,并让正常的 Balance 线程不处理 Colocate Table 的 Balance。正常 Balance 的粒度是 Bucket,但是关于 Colocate Table,有必要确保同一个 Colocate Group 下所有 Bucket 的数据本地性,所以 Balance 的单位是 Colocate Group。
2、高效简易的array_contains
函数
在做人群圈选时,有以下相似的 Json 结构[{"K1":"V1","K2":200},{"k1":"v2","k2":300}]
,当装备 k1=v1,k2=200
,只要该 Value 里的 Json 项有一项满意全部条件就会被圈出来,咱们可以凭借 Doris 1.2 版本中的 array_contains
数组函数处理,将 Json 转化为 Array 数组处理。
3、Broker Load 加速数据导入功率
Doris Broker Load 是一种高效、安稳的数据导入方法,它可以将数据分红多个分片,然后将每个分片分配给不同的 Broker 节点进行处理,咱们运用 Broker Load 将标签数据从 Hive 导入 Doris 中,有效进步了数据导入的功率和安稳性。
BI 数据看板
咱们商业智能剖析运用的 BI 剖析渠道首要是帆柔和自研的阿尔法 BI,底层运用 Doris 来存储数据,现在用来做报表的 Doris 表数量已到达了 3000 多张,四个 Doris 集群的日 UV 1000+ ,PV 到达十几万,因为 Doris 可以到达毫秒级呼应速度、支撑高并发查询,因而单集群的 QPS 可以到达到达 120次/秒,符合咱们的要求。
OLAP 多维剖析
跟着事务的增长,咱们在运营的过程中咱们常常有一些疑问:最近三个月哪个品类的下单量最高?改变趋势如何?各个时段人均下单是多少?某个区域,发生购买行为的年龄段散布是怎样的?……而想要取得成果,有必要依据用户行为日志进行事情剖析。
现在咱们的用户行为数据日均增量为 20亿+,高峰期 100亿+,为了更好的进行事情剖析,咱们需求保存半年的数据,也便是几千亿的数据量。 咱们运用 Doris 来存储如此巨大的数据量,在应对杂乱的剖析场景时可以到达分钟级的呼应。在多维剖析的过程中, 往往也伴跟着大数据量的杂乱查询,接下来共享如何运用 Doris 应对:
1、 Bitmap 去重
事务运用过程中需求剖析用户参加情况以及活跃程度,考查进行初始行为后的用户中,有多少人会进行后续行为,这时候一般都是运用留存剖析模型完结此类需求。该模型运用中有去重操作,核算周期有某天/某周/某月/最近三个月等,因为每天的埋点数据量都能到达几十亿,高峰期 100 亿,在这个情况下,运用 count(distinct)
功能太差、乃至查询超时(或超越设置的时刻),而如果运用 Bitmap 来可以成倍的缩短查询时刻。
select
event_id,
date,
count(distinct uid) as count
from event
where
dt>='2022-06-01' and dt<'2022-06-06' and event_id in (......) group by event_id, str_to_date(dt,'%Y-%m-%d');
运用 Bitmap 优化 SQL 后
select
event_id,
date,
bitmap_count(uid) as count
from event
where
dt>='2022-06-01' and dt<'2022-06-06' and event_id in (......) group by event_id, str_to_date(dt,'%Y-%m-%d');
运用中需求注意 Bitmap 函数在 Apache Doris 中依然需求先把数据会聚到一个 FE 节点才干执行核算,并不能充分发挥散布式核算的优势,在数据量大到一定的情况下, Bitmap 函数并不能取得比 COUNT(DISTINCT)
更好的功能,上述实例之所以能到达预期成果是因为做了分组核算。
如果处理大数据量的全量去重,在建表时将 Bitmap 列的值按照 Range 划分,不同 Range 的值存储在不同的分桶中,确保了不同分桶的 Bitmap 值是正交的。当查询时,先分别对不同分桶中的正交 Bitmap 进行聚合核算,然后顶层节点直接将聚合核算后的值兼并汇总并输出,然后处理顶层单节点核算瓶颈问题。
2、前缀索引和 Bloom Filter 索引
Doris 首要支撑两类索引:内建的智能索引(包括前缀索引)和创立的二级索引(包括 Bloom Filter 索引和 Bitmap 倒排索引)。实际运用时咱们会用到前缀索引和 Bloom Filter 索引来进步查询功率。
前缀索引
Aggregate、Unique 和 Duplicate 三种数据模型中,底层的数据存储是按照各自建表句子中 AGGREGATE KEY、UNIQUE KEY 和 DUPLICATE KEY 指定的列进行排序存储的。前缀索引即在排序的基础上完结的一种依据给定前缀列、快速查询数据的索引方法,完结方法是将一行数据的前 36 个字节作为这行数据的前缀索引,当遇到 VARCHAR 类型时,前缀索引会直接截断。
比方咱们要查询按照日期和 event_id
分组的去重人数,建表句子如下:
CREATE TABLE ubs_event_log_small_event (
event_id int(11) NULL COMMENT "事情id",
dt datetime NOT NULL COMMENT "事情时刻",
uid char(128) NULL COMMENT "用户id",
dict_id int(11) NULL COMMENT "用户id字典值",
os varchar(24) NULL COMMENT "操作系统",
......
dict_id_bitmap bitmap BITMAP_UNION NULL COMMENT "bitmap用户id"
) ENGINE=OLAP
AGGREGATE KEY(event_id, dt, uid, dict_id, os, ......)
COMMENT "用户行为事情表"
PARTITION BY RANGE(dt)
()
DISTRIBUTED BY HASH(dt, event_id, uid) BUCKETS 64
SQL 查询的 Where 条件一般遵从建表的 AGGREGATE 模型的 KEY 的次序,这样可以射中 Doris 内置的前缀索引。
SELECT
CONCAT(
TO_DATE(dt),
' 00:00:00'
) AS tm,
event_id,
BITMAP_UNION_COUNT(dict_id_bitmap) AS UNIQ_1908_1
FROM
kepler.ubs_event_log_small_event
WHERE event_id = 1908 AND
dt >= '2023-03-26'
AND dt < '2023-04-05'
AND
os IN (1, 2)
GROUP BY
1,
2;
Bloom Filter 索引
针对大数据量的事情表进行查询时咱们会设置 bloom_filter_columns
,加速查询功率:
alter table datasets set("bloom_filter_columns" = "area_name, is_booking, user_source, source_first_order......");
查询句子中 where
条件有以上设置的字段就会射中该索引。
SELECT * FROM datasets WHERE area_name="****" AND is_booking=0
3、物化视图
为了取得更粗粒度的聚合数据,Doris 允许在建表句子创立出来的 Base 表的基础上,创立若干 Rollup 表。
例如上表 ubs_event_log_small_event
,咱们可以对 dt
,event_id
,dict_id_bitmap
建立 Rollup 物化视图,这样 Rollup 只包括三列: dt
,event_id
,dict_id_bitmap
。
这时再进行上述查询就会射中这个 Rollup,然后只扫描极少的数据量就可以完结此次聚合查询。
优化经验
Broker Load 导数使命流程化
为了 Doris 运用更加快捷,我司在内部自研的叮咚大数据渠道上对整个过程进行流程化;从建模到装备 Broker Load 导数使命再到导数使命调度均进行了调整,详细优化如下所述:
建模过程: 需求用户建议建模流程申请,填写需求内容、详细建模句子、预估数据量巨细、数据保存时长、所需相关权限账号等信息,足够完整的信息可以在批阅时取得建模过程中的元数据信息以及挑选更合适的数据模型。
Broker Load 导数使命装备: 为了进步用户运用功率、下降运用门槛,咱们经过 Mapping 映射和主动化装备方法,主动生成导数使命。
导数使命调度: 装备完 Broker Load 导数使命,就可以由用户依据需求装备小时或许天等级的调度,这样整个 Doris 数据导入流程,就可以由用户装备主动完结。
总结与展望
Apache Doris 作为叮咚买菜整体架构系统中的中心 OLAP 剖析引擎,不论是作为数仓数据看板类的运用层、还是作为实时数据汇总成果接入层、或是作为 B/C 端服务数据供给方,均可以很好的满意事务需求。除此之外,Doris 使得咱们无需在存储选型上消耗过多时刻,大大缩短了开发的周期;一起,Doris 支撑 MySQL 协议和标准 SQL ,大大下降内部人员的运用本钱和门槛。未来,咱们期望运用 Doris 支撑内部更多的事务场景,更大范围了提高工作功率。咱们也会紧跟社区步伐,积极运用推出的新版本特性,以更好处理场景需求,提高事务作用。
最后,十分感谢 SelectDB 团队对咱们在 Doris 运用上的技能支撑,祝愿 SelectDB 和 Apache Doris 开展越来越好!