Clickhouse作为一个OLAP数据库,它对事务的支持非常有限。Clickhouse供给了MUTATION操作(通过ALTER TABLE句子)来完成数据的更新、删去,但这是一种“较重”的操作,它与规范SQL语法中的UPDATE、DELETE不同,是异步执行的,关于批量数据不频频的更新或删去比较有用 。除了MUTATION操作,Clickhouse还能够通过CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree结合具体事务数据结构来完成数据的更新、删去,这三种方式都通过INSERT句子刺进最新的数据,新数据会“抵消”或“替换”掉老数据,可是“抵消”或“替换”都是产生在数据文件后台Merge时,也就是说,在Merge之前,新数据和老数据会同时存在。因而,咱们需求在查询时做一些处理,防止查询到老数据。Clickhouse官方文档供给了运用CollapsingMergeTree、VersionedCollapsingMergeTree的指导 。比较于CollapsingMergeTree、VersionedCollapsingMergeTree需求标记位字段、版别字段,用ReplacingMergeTree来完成数据的更新删去会愈加便利,这儿着重介绍一下如何用ReplacingMergeTree来完成数据的更新删去。

咱们假设一个需求频频数据更新的场景,如某市用户用电量的统计,咱们知道,用户的用电量每分每秒都有或许产生变化,所以会涉及到数据频频的更新。首要,创立一张表来记录某市一切用户的用电量。

CREATE TABLE IF NOT EXISTS default.PowerConsumption_local ON CLUSTER default_cluster
(
    User_ID             UInt64                              COMMENT '用户ID',
    Record_Time         DateTime    DEFAULT toDateTime(0)   COMMENT '电量记录时刻',
    District_Code       UInt8                               COMMENT '用户地点行政区编码',
    Address             String                              COMMENT '用户地址',
    Power               UInt64                              COMMENT '用电量',
    Deleted             BOOLEAN     DEFAULT 0               COMMENT '数据是否被删去'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/default.PowerConsumption_local/{shard}', '{replica}', Record_Time)
ORDER BY (User_ID, Address)
PARTITION BY District_Code;
CREATE TABLE default.PowerConsumption ON CLUSTER default_cluster AS default.PowerConsumption_local
ENGINE = Distributed(default_cluster, default, PowerConsumption_local, rand());

PowerConsumption_local为本地表,PowerConsumption为对应的分布式表。其中PowerConsumption_local运用ReplicatedReplacingMergeTree表引擎,第三个参数‘Record_Time’表示相同主键的多条数据,只会保存Record_Time最大的一条,咱们正是使用ReplacingMergeTree的这一特性来完成数据的更新删去。因而,在选择主键时,咱们需求保证主键仅有。这儿咱们选择(User_ID, Address)来作为主键,由于用户ID加上用户的地址能够确定仅有的一个电表,不会出现第二个相同的电表,所以关于某个电表多条数据,只会保存电量记录时刻最新的一条。

然后咱们向表中刺进10条数据:

INSERT INTO default.PowerConsumption VALUES (0, '2021-10-30 12:00:00', 3, 'Yanta', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (1, '2021-10-30 12:10:00', 2, 'Beilin', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (2, '2021-10-30 12:15:00', 1, 'Weiyang', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (3, '2021-10-30 12:18:00', 1, 'Gaoxin', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (4, '2021-10-30 12:23:00', 2, 'Qujiang', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (5, '2021-10-30 12:43:00', 3, 'Baqiao', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (6, '2021-10-30 12:45:00', 1, 'Lianhu', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (7, '2021-10-30 12:46:00', 3, 'Changan', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (8, '2021-10-30 12:55:00', 1, 'Qianhan', rand64() % 1000 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (9, '2021-10-30 12:57:00', 4, 'Fengdong', rand64() % 1000 + 1, 0);

表中数据如图所示:

Clickhouse如何实现数据更新

假如现在咱们要行政区编码为1的一切用户数据都需求更新,咱们刺进最新的数据:

INSERT INTO default.PowerConsumption VALUES (2, now(), 1, 'Weiyang', rand64() % 100 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (3, now(), 1, 'Gaoxin', rand64() % 100 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (6, now(), 1, 'Lianhu', rand64() % 100 + 1, 0);
INSERT INTO default.PowerConsumption VALUES (8, now(), 1, 'Qianhan', rand64() % 100 + 1, 0);

刺进最新数据后,表中数据如图所示:

Clickhouse如何实现数据更新

能够看到,此时新刺进的数据与老数据同时存在于表中,由于后台数据文件还没有进行Merge,“替换”还没有产生,这时就需求对查询句子做一些处理来过滤掉老数据,函数argMax(a, b)能够按照b的最大值取a的值,所以通过如下查询句子就能够只获取到最新数据:

SELECT
    User_ID,
    max(Record_Time) AS R_Time,
    District_Code,
    Address,
    argMax(Power, Record_Time) AS Power,
    argMax(Deleted, Record_Time) AS Deleted
FROM default.PowerConsumption
GROUP BY
    User_ID,
    Address,
    District_Code
HAVING Deleted = 0;

查询成果如下图:

Clickhouse如何实现数据更新

为了更便利咱们查询,这儿能够创立一个视图:

CREATE VIEW PowerConsumption_view ON CLUSTER default_cluster AS
SELECT
    User_ID,
    max(Record_Time) AS R_Time,
    District_Code,
    Address,
    argMax(Power, Record_Time) AS Power,
    argMax(Deleted, Record_Time) AS Deleted
FROM default.PowerConsumption
GROUP BY
    User_ID,
    Address,
    District_Code
HAVING Deleted = 0;

通过该视图,能够查询到最新的数据:

Clickhouse如何实现数据更新

假如现在咱们又需求删去用户ID为0的数据,咱们需求刺进一条User_ID字段为0,Deleted字段为1的数据:

INSERT INTO default.PowerConsumption VALUES (0, now(), 3, 'Yanta', null, 1);

查询视图,发现User_ID为0的数据现已查询不到了:

Clickhouse如何实现数据更新

通过如上方法,咱们能够完成Clickhouse数据的更新、删去,就好像在运用OLTP数据库相同,但咱们应该清楚,实际上老数据真正的删去是在数据文件Merge时产生的,只要在Merge后,老数据才会真正物理意义上的删去掉。

本文由华为云发布。