在这篇博文中,我会介绍一种流处理解决计划,用于全面且动态地了解维基百科的修改状况。大家也能够用于其他渠道~
- 从维基百科 API 中提取数据,并将其传输到部署在 Instaclustr 云中的 Kafka Topic。
- 在 RisingWave 创立了 Source 以摄取 Kafka 数据,并创立物化视图进行处理剖析。
- 运用 Superset 对成果进行可视化,生成各类图表和综合看板。
RisingWave 是一个与 PostgreSQL 兼容的流数据库,具有真实的云原生架构,拥有低成本高效益、可扩展等特点。根据 RisingWave,用户仅运用 SQL 就能从流数据中获取方针信息。
Instaclustr 是一个集成了很多盛行开源东西(如 Kafka、PostgreSQL、Cassandra 和 Redis)的完全托管渠道。它供给了便利的 Kafka Connect 集成,包含专用的 ZooKeeper 服务。经过这种 100% 开源计划,Instaclustr 供给了无缝运用 Kafka 的体会。
1. 技术栈
咱们将从 Wikipedia API 获取实时数据,捕获维基百科文章的修改和奉献者信息,然后将它们发布到 Kafka Topic。
随后,数据将被导入 RisingWave。根据这些数据,咱们将创立物化视图来履行一系列操作,如聚合、时刻窗口操作、数据转换等,以从数据中提取有价值的信息。
最终,咱们会把处理后的数据从 RisingWave 导出到 Apache Superset,从而将数据可视化,用更详细直观的方式检查奉献者们对维基百科的实时修改。
2. 在 Instaclustr 云上部署 Kafka
为开始生成事情,咱们需求一个 Kafka 集群。在本文的演示中,咱们将用 Instaclustr 云创立一个 Kafka 集群。
2.1 创立 Kafka 集群
首先,请注册免费的 Instaclustr 账号以获得拜访 Kafka 服务的权限。您能够经过拜访 Instaclustr 云渠道来创立账户。
接着,请参阅 Instaclustr 云供给的Apache Kafka 快速上手攻略,在 Instaclustr 云上创立一个 Kafka 集群。
成功创立 Kafka 集群后,请增加您核算机的 IP 地址到集群中,以便发生和运用数据。
2.2 将维基百科修改数据传输到 Kafka
咱们将首先运用维基百科PythonAPI 来获取各种信息,如用户奉献、用户详细信息和最近更改。
随后,咱们会把这些数据传输到 Instaclustr 云上的 Kafka 集群中,以便后续将数据导入 RisingWave。
咱们的 JSON 音讯将遵循以下 schema:
"contributor":维基百科奉献者的姓名。
"title":所修改的维基百科文章的标题。
"edit_timestamp":修改的时刻戳。
"registration":该维基百科用户的注册日期。
"gender":该维基百科用户的性别。
"edit_count":该维基百科用户的修改次数。
以下是一个发送到 Kafka Topic 的音讯样本:
{
"contributor":"Teatreez",
"title":"SupremeCourtofSouthAfrica",
"edit_timestamp":"2023-12-0318:23:02",
"registration":"2006-12-3018:42:21",
"gender":"unknown",
"edit_count":"10752"
}
3. 连接 RisingWave 与 Kafka Topic
要运用 RisingWave,请参阅快速上手攻略创立一个 RisingWave 集群。
随后,为了 RisingWave 和 Instaclustr 能成功连接,请先前往 Instaclustr,将您的 RisingWave 集群的 NAT 网关 IP 地址增加到 Instaclustr 云中 Kafka 集群的防火墙规矩(Firewall Rules)中。这一步有利于避免潜在的连接错误。
成功创立 RisingWave 集群后,咱们在 RisingWave 中创立一个 Source,用于从 Instaclustr 云中的 Kafka Topic 导入数据到RisingWave。
请运用以下查询创立一个连接到 Instaclustr 云中 Kafka Topic 的 Source,注意,各认证参数需求精确填写对应的值。
CREATESOURCEwiki_source(
contributorVARCHAR,
titleVARCHAR,
edit_timestampTIMESTAMPTZ,
registrationTIMESTAMPTZ,
genderVARCHAR,
edit_countVARCHAR
)WITH(
connector='kafka',
topic='Insta-topic',
properties.bootstrap.server='x.x.x.x:9092',
scan.startup.mode='earliest',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='ickafka',
properties.sasl.password='xxxxxx'
)FORMATPLAINENCODEJSON;
然后,咱们根据 Source wiki_source
创立一个名为wiki_mv
的物化视图。注意,以下代码中,咱们过滤掉了带有空值的行。
CREATEMATERIALIZEDVIEWwiki_mvAS
SELECT
contributor,
title,
CAST(edit_timestampASTIMESTAMP)ASedit_timestamp,
CAST(registrationASTIMESTAMP)ASregistration,
gender,
CAST(edit_countASINT)ASedit_count
FROMwiki_source
WHEREtimestampISNOTNULL
ANDregistrationISNOTNULL
ANDedit_countISNOTNULL;
现在,咱们能够查询物化视图,获取 Source 中的最新数据:
SELECT*FROMwiki_mvLIMIT5;
回来成果将相似如下:
contributor|title|edit_timestamp|registration|gender|edit_count
--------------- ----------------------------- --------------------------- --------------------------- --------- -----------
Omnipaedista|Template:Goodandevil|2023-12-0310:22:02 00:00|2008-12-1406:02:32 00:00|male|222563
PepeBonus|MoshimoInochigaEgaketara|2023-12-0310:22:16 00:00|2012-06-0213:39:53 00:00|unknown|20731
Koulog|IonikosF.C.|2023-12-0310:23:00 00:00|2023-10-2805:52:35 00:00|unknown|691
FauTzy|2023Liga3Maluku|2023-12-0310:23:17 00:00|2022-07-2309:53:11 00:00|unknown|4697
Cavarrone|Cheers(season8)|2023-12-0310:23:40 00:00|2008-08-2311:13:14 00:00|male|83643
(5rows)
接下来,咱们再创立几个查询:
以下查询创立了一个新的物化视图gender_mv
,将物化视图wiki_mv
中的奉献按一分钟距离进行聚合。该物化视图核算了多个数据,包含每个时刻窗口内的总奉献数、不知道性别者的奉献数,以及已知性别者的奉献数。根据此物化视图,咱们能够更便利地根据性别对奉献模式进行剖析和监控。
CREATEMATERIALIZEDVIEWgender_mvAS
SELECTCOUNT(*)AStotal_contributions,
COUNT(CASEWHENgender='unknown'THEN1END)AScontributions_by_unknown,
COUNT(CASEWHENgender!='unknown'THEN1END)AScontributions_by_male_or_female,
window_start,window_end
FROMTUMBLE(wiki_mv,edit_timestamp,INTERVAL'1MINUTES')
GROUPBYwindow_start,window_end;
以下查询创立了物化视图registration_mv
,它相同将物化视图wiki_mv
中的奉献按一分钟距离进行聚合,核算的信息包含:总奉献数、2020年1月1日之前注册账户的奉献数,以及2020年1月1日之后注册账户的奉献数。
CREATEMATERIALIZEDVIEWregistration_mvAS
SELECTCOUNT(*)AStotal_contributions,
COUNT(CASEWHENregistration<'2020-01-0101:00:00'::timestampTHEN1END)AScontributions_by_someone_registered_before_2020,
COUNT(CASEWHENregistration>'2020-01-0101:00:00'::timestampTHEN1END)AScontributions_by_someone_registered_after_2020,
window_start,window_end
FROMTUMBLE(wiki_mv,edit_timestamp,INTERVAL'1MINUTES')
GROUPBYwindow_start,window_end;
以下查询创立了物化视图count_mv
,将wiki_mv
物化视图中的奉献按一分钟距离进行聚合,然后核算:总奉献数、修改次数少于 1000 次的奉献者的奉献数,以及修改次数大于等于 1000 次的奉献者的奉献数。
CREATEMATERIALIZEDVIEWcount_mvAS
SELECT
COUNT(*)AStotal_contributions,
COUNT(CASEWHENedit_count<1000THEN1END)AScontributions_less_than_1000,
COUNT(CASEWHENedit_count>=1000THEN1END)AScontributions_1000_or_more,
window_start,window_end
FROMTUMBLE(wiki_mv,edit_timestamp,INTERVAL'1MINUTES')
GROUPBYwindow_start,window_end;
4. 将数据从 RisingWave 导出到 Apache Superset 进行可视化
Superset 是一个用于创立看板和可视化内容的开源东西。接下来咱们将装备 Superset,从 RisingWave 读取数据并导出到 Superset 进行可视化。
4.1 连接 RisingWave 和 Superset
请依照 RisingWave 文档中的攻略,装备 Superset 从 RisingWave 读取数据。在此过程中,咱们会把 RisingWave 作为数据源增加到 Apache Superset 中,并运用其间的表和物化视图创立可视化和看板。
成功将 RisingWave 连接到 Apache Superset 后,您能够依照该攻略剩下部分的指导,将 RisingWave 中的物化视图作为数据集进行增加,创立表、图表和整合后的看板。
4.2 可视化成果展现:表、图表和看板
下表运用wiki_mv
数据集生成,显现了 Wikipedia 奉献者的姓名、注册日期、性别、修改次数以及奉献者修改过的 Wikipedia 文章等信息。
以下面积图运用count_mv
数据集创立,展现了指定时刻窗口内的:总奉献数、修改次数少于 1000 次的奉献者的奉献数,以及修改次数大于等于 1000 次的奉献者的奉献数。
以下折线图根据gender_mv数据集生成,展现了指定时刻窗口内的:总奉献数、不知道性别者的奉献,以及已知性别者的奉献。
下图则运用registration_mv
数据集创立,在 1 分钟的时刻窗口内可视化了各种类型的奉献计数,包含:总奉献数、2020年1月1日之前注册的用户的奉献数,以及2020年1月1日之后注册的用户的奉献数。
最终,以下是一个整合了以上各项图表的看板,让您能够全面、实时地监控 Wikipedia 修改信息,以全面地发掘奉献者及其所修改文章相关的重要信息。
5. 关于 RisingWave
RisingWave 是一款分布式 SQL 流处理数据库,旨在协助用户降低实时应用的的开发成本。作为专为云上分布式流处理而设计的系统,RisingWave 为用户供给了与 PostgreSQL 相似的运用体会,而且具有比 Flink 高出 10 倍的性能以及更低的成本。
假如你还不知道如何上手 RisingWave,请体会中文入门教程:www.risingwavetutorial.com/
想要更深化地理解并运用 RisingWave,请阅览中文用户文档:zh-cn.risingwave.com/docs