作者:尚卓燃(PsiACE)
澳门科技大学在读硕士,Databend 研发工程师实习生
Apache OpenDAL(Incubating)
Committerhttps://github.com/PsiACE
云核算为以数据为中心的运用供给了廉价、弹性、共享的存储服务,这为现代数据处理作业流供给了显而易见的好处:海量数据、高并发拜访、大吞吐量,越来越多的事例开端将旧有的技能栈向数据湖架构进行迁移。
当咱们将数据湖置于云端之后,新的问题随之而来:
- 旧有的数据仓库/大数据剖析技能或许并不是专为云和目标存储规划的,功能和兼容性或许不太理想,需求投入很多的资源进行维护,如何为数据湖供给实在现代的、低本钱、高功能、高质量的剖析服务?
- 对数据办理的需求仅一步加强,对剖析成果的可复现性、数据源的可共享性供给了更高的要求,如何为数据供给弹性和可办理性,让数据科学家、数据剖析师和数据工程师在逻辑共同的视图下紧密协作?
有问题,就会有答案。
Databend 基于云上的目标存储打造了实在跨云且原生的数据仓库,选用 serverless 理念规划,供给分布式、弹性可拓展、运维便利的高功能查询引擎,支撑常见的结构化与半结构化数据,能够与现代数据技能栈进行紧密集成。
lakeFS 致力于为共享和协作处理数据供给处理计划,用相似 Git 的操作逻辑赋能目标存储,选用版别化计划为数据供给逻辑共同的视图,为现代化数据作业流嵌入有意义的分支名和提交信息,并且为数据、文档的一体化供给处理计划。
在这篇文章中,咱们将会结合二者,供给一个简略明晰的 workshop,协助你快速建立现代化的数据作业流。
为什么你需求 Databend
跟着数据量的激增,传统的数据仓库面临着巨大的应战。它们无法有效地存储和处理海量数据,也难以根据作业负载弹性调整核算和存储资源,导致运用本钱昂扬。此外,数据处理杂乱,需求投入很多资源进行 ETL,数据回退和版别操控也非常困难。
Databend 致力于处理这些痛点。它是一款运用 Rust 开发的开源、弹性、负载感知的云数据仓库,能够为超大规模数据集供给经济高效的杂乱剖析才能。
- 云友好:无缝集成各种云存储,如 AWS S3、Azure Blob、CloudFlare R2 等。
- 高功能:运用 Rust 开发,运用 SIMD 和向量化处理实现极速剖析。
- 经济弹性:立异规划,存储和核算独立弹性,优化本钱和功能。
- 简易数据办理:内置数据预处理才能,无需外部 ETL 东西。
- 数据版别操控:供给相似 Git 的多版别存储,支撑恣意时刻点的数据查询、克隆和回退。
- 丰富的数据支撑:支撑 JSON、CSV、Parquet 等多种数据格局和类型。
- AI 增强剖析:集成 AI 函数,供给由大模型驱动的数据剖析才能。
- 社区驱动:拥有友好、持续增长的社区,供给易用的云上剖析平台。
上图为 Databend 架构图,摘自 datafuselabs/databend 。
为什么你需求 lakeFS
因为目标存储往往缺少原子性、回滚等才能,数据的安全性不能很好地得到确保,质量和可恢复性也随之下降。为了维护出产环境的数据,往往不得不选用阻隔副本的方式进行预演测试,消耗资源不说,还难以实在进行协同作业。
提到协同,你或许会想到 Git ,但 Git 并不是为数据办理而规划的,除了二进制数据办理不方便之外,Git LFS 对单个文件巨细的约束也制约了其适用场景。
lakeFS 应运而生,它供给用于数据湖的开源数据版别操控 — branch, commit, merge, revert,就像运用 Git 办理代码相同。因为支撑零复制开发/测试阻隔环境、接连质量验证、过错数据的原子回滚、可重复性等高级特性,你甚至能够轻松在出产数据上验证 ETL 作业流,而不再担心对事务造成危害。
上图为 lakeFS 引荐的数据作业流,摘自 lakefs.io/ 。
Workshop: 运用 lakeFS 加持你的剖析事务
在这个 workshop 中,咱们将会运用 lakeFS 为存储库创立分支,并运用 Databend 对预置数据进行剖析和转化。
因为试验环境包括一些依赖,第一次发动或许用时较久。咱们更引荐运用 Databend Cloud lakeFS cloud 的组合,这样你就能够跳过耗时颇多的 环境设置 部分,直接上手体验 数据剖析 与转化。
环境设置
本次运用的环境中除了 lakeFS 之外,还将包括作为底层目标存储服务的 MinIO ,以及 Jupyter 、Spark 等常用的数据科学东西。 你能够查看 这个 git 存储库了解更多相关信息。
上图为本次试验环境的示意图,摘自 treeverse/lakeFS-samples 。
克隆存储库
git clone https://github.com/treeverse/lakeFS-samples.git
cd lakeFS-samples
发动全栈试验环境
docker compose --profile local-lakefs up
一旦发动试验环境,你就能够运用默许装备登录 lakeFS 和 MinIO,以在后续的过程中调查数据的变化。
数据调查
在环境设置过程中,会在 lakeFS 中预先预备一个名为 quickstart 的存储库,在这一步中,咱们将会对其进行一些简略的调查。
假如你运用自己布置的 lakeFS MinIO 环境
- 或许需求先自己手动在 MinIO 中创立对应的 bucket 。
- 再在 LakeFS 中创立对应的存储库,并勾选填充示例数据。
lakeFS
在浏览器翻开 lakeFS(http://127.0.0.1:8000),输入 Access Key ID 和 Secret Access Key 就能够登录到 lakeFS 。
接着翻开 quickstart 存储库,能够看到现已存在一些默许的数据,并且还包括一个默许教程。
lakeFS 的数据存储库方式简直与 GitHub 之类的代码存储库对应,简直没有什么学习本钱:其间 lakes.parquet
为预先预备好的数据,data
文件夹中的 lakes.source.md
介绍了数据的来历 ; scripts
文件夹包括用于数据校验的脚本,其完整的作业流能够在_lakefs_actions
目录下找到,编写方式相似 GitHub Actions ;README.md
对应下方教程的 Markdown 源文件,而 images
中包括了运用的全部图画。
MinIO
因为在试验环境中,咱们运用 MinIO 作为底层存储,所以在 MinIO 中也能够找到一个名为 quickstart
的 bucket 。 这是由 lakeFS 创立存储库时的 StorageNamespace
决定的。
其间,dummy
文件是新建 lakeFS 存储库时就会创立的,用来确保咱们有满意的权限写入 bucket 。
而 _lakefs
目录中只包括两个文件,从 S3 等数据源导入数据时创立,用来标识对导入文件原始位置的一些引证。
经过 lakeFS 编写的新目标则会坐落 data
目录下。
数据的对应
翻开 data
目录,咱们能够找到一些文件,但很难与 lakeFS 中的数据对应起来。
让咱们回到 lakeFS ,点击文件右侧的齿轮图标,再选中 Object Info
就能够轻松找出对应联系。
数据剖析 与转化
在这一步中,咱们将会布置 Databend 服务,经过 Stage 挂载 lakeFS 中的数据并进行剖析,并用转化后的成果替换 denmark-lakes
分支中的 lakes.parquet
数据文件。
布置 Databend
Databend 的存储引擎同样支撑 Time Travel、原子回滚等高级特性,无需担心操作失误。
这儿咱们运用单节点 Databend 服务的方式,以 MinIO 作为存储后端。总体上布置过程能够参阅 Databend 官方文档 ,需求留意的一些细节如下:
-
因为咱们现已在上面的过程中布置了 MinIO 服务,这儿只需求翻开
127.0.0.1:9000
创立一个名为databend
的 Bucket 。 -
接着需求为日志和 Meta 数据预备相关的目录
-
sudo mkdir /var/log/databend sudo mkdir /var/lib/databend sudo chown -R $USER /var/log/databend sudo chown -R $USER /var/lib/databend
-
-
其次,因为默许的
http_handler_port
现已被前面的服务占用掉,所以需求修正databend-query.toml
进行一些修正防止抵触:-
http_handler_port = 8088
-
-
另外,咱们还需求根据 Docs | Configuring Admin Users 装备办理员用户,因为只是一个 workshop ,这儿挑选最简略的方式,只是撤销
[[query.users]]
字段以及 root 用户的注释:-
[[query.users]] name = "root" auth_type = "no_password"
-
-
因为咱们运用 MinIO 作为存储后端,所以需求对
[storage]
进行装备。-
[storage] # fs | s3 | azblob | obs | oss type = "s3" # To use S3-compatible object storage, uncomment this block and set your values. [storage.s3] bucket = "databend" endpoint_url = "http://127.0.0.1:9000" access_key_id = "minioadmin" secret_access_key = "minioadmin" enable_virtual_host_style = false
-
接下来就能够正常发动 Databend :
./scripts/start.sh
咱们强烈引荐你运用 BendSQL 作为客户端,因为 http_handler_port
端口改变,需求运用 bendsql -P 8088
衔接 Databend 服务。当然,咱们也支撑像 MySQL Client 和 HTTP API 等多种拜访方式。
创立分支
lakeFS 的运用方法与 GitHub 相似,翻开 Web UI 的 branches 页面 ,点击 Create Branch
按钮,创立一个名为 denmark-lakes
的分支。
创立 Stage
Databend 能够经过 Stage 来挂载坐落远端存储服务的数据目录。因为 lakeFS 供给 S3 Gateway API,所以咱们能够按照 s3 兼容服务进行装备衔接。需求留意的是,此处的 URL 需求按照 s3://<repo>/<branch>
进行结构,而且 lakeFS 的 ENDPOINT_URL
为 8000 端口。
CREATE STAGE lakefs_stage
URL='s3://quickstart/denmark-lakes/'
CONNECTION = (
REGION = 'auto'
ENDPOINT_URL = 'http://127.0.0.1:8000'
ACCESS_KEY_ID = 'AKIAIOSFOLKFSSAMPLES'
SECRET_ACCESS_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY');
经过履行下述 SQL 句子,咱们能够过滤出目录中 Parquet 格局的数据文件。
LIST @lakefs_stage PATTERN = '.*[.]parquet';
因为 Databend 现已支撑 SELECT form Stage 才能,无需导入数据也能够进行根本的查询。
SELECT * FROM @lakefs_stage/lakes.parquet LIMIT 5;
创立表,并进行一些简略的查询
在清洗数据之前,让咱们先将数据导入到 Databend 中,进行一些简略的查询。
因为 Databend 内置 Infer Schema(揣度数据结构)才能,能够便利地从文件创立表。
CREATE TABLE lakes AS SELECT * FROM @lakefs_stage/lakes.parquet;
接下来,让咱们列出湖泊最多的 5 个国家。
SELECT country, COUNT(*)
FROM lakes
GROUP BY country
ORDER BY COUNT(*)
DESC LIMIT 5;
数据清洗
这次数据清洗的目标是结构一个小型的湖泊数据集,只保留丹麦湖泊数据。运用 DELETE FROM
句子能够轻松满意这一目标。
DELETE FROM lakes WHERE Country != 'Denmark';
接下来让咱们再次查询湖泊数据,查看是否只剩下丹麦湖泊。
SELECT country, COUNT(*)
FROM lakes
GROUP BY country
ORDER BY COUNT(*)
DESC LIMIT 5;
运用 PRESIGN 将成果写回 lakeFS
在这一步中,咱们需求用清洗后的成果替换 denmark-lakes
分支中的 Parquet 文件。
首先咱们能够运用 COPY INTO <location>
语法,将数据导出到内置的匿名 Stage。
COPY INTO @~ FROM lakes FILE_FORMAT = (TYPE = PARQUET);
接下来,让咱们列出 @~
这个 Stage 下的成果文件。
LIST @~ PATTERN = '.*[.]parquet';
履行 PRESIGN DOWNLOAD
句子,咱们能够获取用于下载成果数据文件的 URL:
PRESIGN DOWNLOAD @~/<your-result-data-file>;
翻开新终端,运用 curl
命令即可完结数据文件下载。
curl -O '<your-presign-download-url>'
接下来,运用 PRESIGN UPLOAD
句子,咱们能够获取预签名的 URL ,用于上传数据文件。这儿运用 @lakefs_stage/lakes.parquet;
是期望将 lakes.parquet
替换为咱们清洗后的丹麦湖泊数据。
PRESIGN UPLOAD @lakefs_stage/lakes.parquet;
翻开终端,运用 curl
命令即可完结上传。
curl -X PUT -T <your-result-data-file> '<your-presign-upload-url>'
此刻文件现已被替换为清洗后的数据,再次列出 Stage 中的 Parquet 文件,能够看到文件巨细和最终修正时刻现已发生变化。
LIST @lakefs_stage PATTERN = '.*[.]parquet';
再次查询数据文件进行验证,承认现已是清洗后的数据。
SELECT country, COUNT(*)
FROM @lakefs_stage/lakes.parquet
GROUP BY country
ORDER BY COUNT(*)
DESC LIMIT 5;
提交改变
这一步咱们会将改变提交至 lakeFS 进行保存。
在 lakeFS 的 Web UI 界面中,翻开 Uncommitted Changes 页面,确保选中 denmark-lakes
分支。
点击右上角的 Commit Changes
按键,编写提交信息,并承认提交。
查看主分支中 原始数据
denmark-lakes
中的原始数据现已被替换为清洗后的较小数据集,让咱们切换回 main
分支,查看原始数据有没有收到影响。
同样地,经过创立 Stage 来挂载数据文件。
CREATE STAGE lakefs_stage_check
URL='s3://quickstart/main/'
CONNECTION = (
REGION = 'auto'
ENDPOINT_URL = 'http://127.0.0.1:8000'
ACCESS_KEY_ID = 'AKIAIOSFOLKFSSAMPLES'
SECRET_ACCESS_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY');
接着查询湖泊数据,列出湖泊数量最多的 5 个国家。
SELECT country, COUNT(*)
FROM @lakefs_stage_check/lakes.parquet
GROUP BY country
ORDER BY COUNT(*)
DESC LIMIT 5;
main
分支中的一切都坚持原样,咱们在确保原始数据不受搅扰的情况下,得到了一份清洗好的丹麦湖泊数据集。
额外应战
在这个 workshop 中,咱们了解到如何为数据创造阻隔的分支,并且在 Databend 中进行了一些简略的查询和清洗作业。
假如你还想应战更多内容,能够参阅 IakeFS 的官方教程,测验 分支合并 和 数据回滚 才能;也能够参阅 Databend 官方教程,体验 在数据导入阶段进行数据清洗 和 Time Travel 等才能。
咱们同样欢迎将 Databend 和 IakeFS 引入出产环境,在实在的作业负载中进行验证。
关于 Databend
Databend 是一款开源、弹性、低本钱,基于目标存储也能够做实时剖析的新式数仓。期待您的重视,一起探索云原生数仓处理计划,打造新一代开源 Data Cloud。