在上一章中,咱们介绍了Delta Lake,并了解了它怎么为传统数据湖增加了业务保证、DML支撑、审计、统一的流式和批处理模型、方式强制履行以及可扩展的元数据模型。
在本章中,咱们将亲身测验Delta Lake。首要,咱们将在装置了Spark的本地机器上设置Delta Lake。咱们将在两个交互式shell中运转Delta Lake示例:
首要,咱们将运转带有Delta Lake软件包的PySpark交互式shell。这将答应咱们输入并运转一个创立Delta表的简略两行Python程序。
接下来,咱们将运用Spark Scala shell运转相似的程序。虽然本书没有具体介绍Scala语言,但咱们期望演示Delta Lake可与Spark shell和Scala一同运用。
接下来,咱们将在您喜欢的编辑器中创立一个名为helloDeltaLake的Python起始程序,并在PySpark shell中以交互办法运转程序。本章中设置的环境和helloDeltaLake程序将成为咱们在本书中创立大多数其他程序的根底。
一旦环境预备就绪,咱们就预备深化研讨Delta表格格局。因为Delta Lake运用Parquet作为底层存储介质,咱们首要简要了解Parquet格局。因为当咱们稍后研讨业务日志时,分区和分区文件扮演着重要角色,因而咱们将研讨自动分区和手动分区的机制。接下来,咱们将转向Delta表格,研讨Delta表格如安在_delta_log目录中增加业务日志。
本章的其余部分将专门用于业务日志。咱们将创立并运转多个Python程序,以研讨业务日志条意图具体信息,记载了哪些类型的操作,以及何时以及怎么编写Parquet数据文件以及它们怎么与业务日志条目相关。咱们将检查更杂乱的更新示例及其对业务日志的影响。终究,咱们将介绍检查点文件的概念,以及它们怎么协助Delta Lake施行可扩展的元数据体系。
获取规范的Spark镜像
在本地机器上设置Spark或许会让人望而生畏。您需求调整许多不同的设置,更新包等。因而,咱们选择运用Docker容器。假如您没有装置Docker,能够从其官方网站免费下载。咱们运用的具体容器是规范的Apache Spark镜像。要下载该镜像,您能够运用以下指令:
docker pull apache/spark
在您拉取了镜像之后,能够运用以下指令发动容器:
docker run -it apache/spark /bin/sh
Spark的装置坐落/opt/spark目录中。PySpark、spark-sql和一切其他东西坐落/opt/spark/bin目录中。有关怎么运用容器的更多阐明,您能够在本书的GitHub存储库的自述文件中找到。
运用PySpark与Delta Lake
如前所述,Delta Lake运转在现有存储之上,与现有的Apache Spark API彻底兼容。这意味着假如您现已装置了Spark或按照前一部分中的阐明运用了容器,那么开端运用Delta Lake将会很简略。
有了Spark,您能够装置delta-spark 2.4.0包。您能够在其PySpark目录中找到delta-spark包。在指令行中输入以下指令:
pip install delta-spark
装置完delta-spark后,能够像这样交互式运转Python shell:
pyspark --packages io.delta:<delta_version>
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
--conf "spark.sql.catalog.spark_catalog=
org.apache.spark.sql.delta.catalog.DeltaCatalog"
这将为您供给一个PySpark shell,您能够经过其交互式运转指令:
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ ’_/
/__ / .__/_,_/_/ /_/_ version 3.2.2
/_/
Using Python version 3.9.13 (tags/v3.9.13:6de2ca5, May 17 2022 16:36:42)
Spark context Web UI available at http://host.docker.internal:4040
Spark context available as ’sc’ (master = local[*],
app id = local-1665944381326).
SparkSession available as ’spark’.
在Shell中,您现在能够运转交互式的PySpark指令。咱们总是经过运用Spark创立一个range()来进行快速测验,然后生成一个DataFrame,然后能够将其保存为Delta Lake格局(有关更多具体信息,请参见“创立和运转一个Spark程序:helloDeltaLake”)。 以下是完好的代码:
data = spark.range(0, 10)
data.write.format("delta").mode("overwrite").save("/book/testShell")
以下是完好的运转:
>>> data = spark.range(0, 10)
>>> data.write.format("delta").mode("overwrite").save("/book/testShell")
>>>
这里咱们看到了创立range()
的句子,然后是写入句子。咱们能够看到Spark的履行器正在运转。当您打开输出目录时,您将找到生成的Delta表(关于Delta表格局的更多细节将鄙人一节中介绍)。
在Spark Scala Shell中运转Delta Lake
您还能够在交互式Spark Scala shell中运转Delta Lake。依据Delta Lake Quickstart中的阐明,您能够运用以下办法发动Scala shell,并增加Delta Lake包:
spark-shell --packages io.delta:<delta_version>
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
--conf "spark.sql.catalog.spark_catalog=
org.apache.spark.sql.delta.catalog.DeltaCatalog"
这将发动交互式Scala shell:
Spark context Web UI available at http://host.docker.internal:4040
Spark context available as 'sc' (master = local[*],
app id = local-1665950762666).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 3.2.2
/_/
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_311)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
在Shell中,您现在能够运转交互式的Scala指令。让咱们在Scala上进行相似的测验,就像您在PySpark Shell中所做的相同:
val data = spark.range(0, 10)
data.write.format("delta").mode("overwrite").save("/book/testShell")
以下是完好的运转示例:
cala> val data = spark.range(0, 10)
data: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> data.write.format("delta").mode("overwrite").save("/book/testShell")
同样,当您检查输出时,您会发现生成的Delta表。
在Databricks上运转Delta Lake
关于本书后面的示例,选择了Databricks社区版来运转Delta Lake。选择这个平台来开发和运转代码示例,因为它是免费的,简化了Spark和Delta Lake的设置,不需求您自己的云账户,也不需求供给云核算或存储资源。运用Databricks社区版,用户能够拜访一个具有完好笔记本环境和已装置Delta Lake的最新运转时的集群。
假如您不想在本地核算机上运转Spark和Delta Lake,还能够在云平台上的Databricks上运转Delta Lake,例如Azure、AWS或Google Cloud。这些环境使得开端运用Delta Lake变得愈加简略,因为它们现已装置了Delta Lake的版别。
云的额定好处是您能够创立具有恣意大小的真正的Spark集群,潜在地能够跨过数百个节点,具有成千上万个中心,用于处理几十TB或PB等级的数据。
在云中运用Databricks有两种选项。您能够运用其受欢迎的笔记本,也能够运用Databricks实验室的开源东西dbx,将您喜欢的开发环境连接到云上的Databricks集群。 dbx是由Databricks实验室供给的东西,答应您从编辑环境连接到Databricks集群。
创立和运转一个Spark程序:helloDeltaLake
装置delta-spark包后,创立你的榜首个PySpark程序十分简略。按照以下过程创立PySpark程序:
- 创立一个新文件(咱们命名为helloDeltaLake.py)。
- 增加必要的导入句子。至少需求导入PySpark和Delta Lake:
import pyspark
from delta import *
接下来,创立一个SparkSession builder,加载Delta Lake的扩展,如下所示:
# Create a builder with the Delta extensions
builder = pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
接下来,咱们能够创立SparkSession方针本身。咱们将创立SparkSession方针并打印出其版别,以保证方针有用:
# Create a Spark instance with the builder
# As a result, you now can read and write Delta tables
spark = configure_spark_with_delta_pip(builder).getOrCreate()
print(f"Hello, Spark version: {spark.version}")
为了验证咱们的Delta Lake扩展是否正常作业,咱们创立了一个范围并以Delta Lake格局写入它:
# Create a range, and save it in Delta Lake format to ensure
# that your Delta Lake extensions are indeed working
df = spark.range(0, 10)
df.write
.format("delta")
.mode("overwrite")
.save("/book/chapter02/helloDeltaLake")
这样就完成了您的入门程序的代码。您能够在书的代码存储库的/chapter02/helloDeltaLake.py位置找到完好的代码文件。假如您想编写自己的代码,这个代码是一个很好的起点。
要运转程序,咱们只需在Windows上发动指令提示符,或在MacOS上发动终端,并导航到包括咱们代码的文件夹。然后,咱们能够运用以下指令发动PySpark,将程序作为输入:
pyspark < helloDeltaLake.py
当咱们运转程序时,咱们会得到咱们的Spark版别输出(显现的版别将取决于读者装置的Spark版别):
Hello, Spark version: 3.4.1
当咱们检查输出时,咱们能够看到咱们现已写了一个有用的Delta表。 Delta Lake格局的具体信息将鄙人一节中介绍。
在这一点上,咱们现已成功装置了PySpark和Delta Lake,而且能够编写和运转一个具有Delta Lake扩展的完好的PySpark程序。既然您能够运转自己的程序,咱们预备好鄙人一节中具体评论Delta Lake格局。
Delta Lake格局
在本节中,咱们将深化评论Delta Lake的敞开式表格格局。当咱们运用这种格局保存文件时,实践上是在写入具有额定元数据的规范Parquet文件。这个额定的元数据是启用Delta Lake中心功用的根底,甚至能够履行传统关系数据库办理体系(RDBMS)中一般看到的DML操作,如INSERT、UPDATE和DELETE,以及很多其他操作。
因为Delta Lake将数据写出为Parquet文件,咱们将更深化地研讨Parquet文件格局。咱们首要会写出一个简略的Parquet文件,并具体检查Spark写入的相关内容。这将让咱们对本书中要运用的文件有一个很好的了解。
接下来,咱们将以Delta Lake格局写出一个文件,留意到它触发了创立包括业务日志的”_delta_log”目录。咱们将具体研讨这个业务日志以及它是怎么用于生成单一数据源的。咱们将看到业务日志是怎么完成第1章中说到的ACID原子性属性。
咱们将看到Delta Lake是怎么将一个业务分解为单独的原子提交操作,并将这些操作记载在业务日志中,以有序、原子单位的办法。终究,咱们将研讨几种用例,并调查写入了哪些Parquet数据文件和业务日志条目,以及这些条目中存储了什么内容。
因为每个业务都会写入一个业务日志条目,或许会导致出现多个小文件。为保证这种办法仍然可扩展,Delta Lake将每隔10个业务(在撰写本文时)创立一个检查点文件,其间包括完好的业务状况。这样,Delta Lake读取器只需处理检查点文件和随后写入的少量业务条目。这完成了一个快速、可扩展的元数据体系。
Parquet文件
Apache Parquet文件格局是过去20年来最盛行的大数据格局之一。Parquet是开源的,因而它能够在Apache Hadoop许可下免费运用,并与大多数Hadoop数据处理结构兼容。
与依据行的格局(如CSV或Avro)不同,Parquet是一种面向列的格局,这意味着每个列/字段的值都存储在一同,而不是在每个记载中。图2-1显现了依据行的布局和面向列的布局之间的差异,以及如安在逻辑表中表明这些差异。
图2-1展现了与行布局不同的是,列布局按列值的次序依次存储,这种列格局有助于逐列进行紧缩。此格局还支撑灵活的紧缩选项和可扩展的数据类型编码方式,这意味着能够运用不同的编码来紧缩整数和字符串数据类型。
Parquet文件由行组和元数据组成。行组包括来自同一列的数据,因而每列都存储在同一个行组中。Parquet文件中的元数据不只包括有关这些行组的信息,还包括有关列(例如最小/最大值、值的数量)和数据方式的信息,这使Parquet成为一个具有附加元数据以支撑更好数据跳跃的自描述文件。
图2-2显现了Parquet文件由行组和元数据组成。每个行组包括数据集中的每一列的列块,而每个列块由一个或多个包括列数据的页组成。要深化了解Parquet文件格局的更多文档,请拜访Apache Parquet的网站和文档。
Parquet文件的优势
因为其面向列的格局、存储布局、元数据和长期受欢迎,Parquet文件在剖析作业负载和处理大数据时具有以下几个强壮的优势:
- 高功用
Parquet文件是一种面向列的格局,它们能够更好地进行紧缩和编码,因为这些算法能够运用每一列中存储的相似值和数据类型。关于I/O密集型操作,这种紧缩数据能够明显进步功用。
在Parquet文件的情况下,当列值一同存储时,查询只需读取查询所需的列,而不需求在依据行的格局中读取一切列。这意味着列格局能够减少需求读取的操作数据量,然后进步功用。
Parquet文件中包括的元数据描述了数据的一些特征,其间包括有关行组、数据方式以及最重要的列的信息。列元数据包括最小/最大值和值的数量等信息。这些元数据一同减少了每个操作所需读取的数据量(即数据跳跃),然后完成更好的查询功用。
- 经济高效
因为Parquet文件能够更好地运用紧缩和编码,这使得数据本身愈加经济高效。紧缩后的数据在存储文件时占用更少的磁盘空间,然后降低了存储空间和存储本钱。
- 互操作性
因为Parquet文件在过去20年中十分盛行,特别是关于传统大数据处理结构和东西(例如Hadoop),它们得到了广泛支撑,并供给了出色的互操作性。
创立一个Parquet文件
在图书存储库中,坐落/chapter02/writeParquetFile的Python程序在内存中创立一个Spark DataFrame,并运用规范的PySpark API以Parquet格局将其写入/parquetData文件夹。
data = spark.range(0, 100)
data.write.format("parquet")
.mode("overwrite")
.save('/book/chapter02/parquetData')
在咱们的情况下,当咱们检查写入磁盘的内容时,咱们看到以下内容(依据您的本地机器不同,您或许会看到不同的成果):
Directory of C:bookchapter02parquetData
10/17/2022
10/17/2022 511 part-00000-a3885270-...-c000.snappy.parquet
10/17/2022 513 part-00001-a3885270-...-c000.snappy.parquet
10/17/2022 517 part-00002-a3885270-...-c000.snappy.parquet
10/17/2022 513 part-00003-a3885270-...-c000.snappy.parquet
10/17/2022 513 part-00004-a3885270-...-c000.snappy.parquet
10/17/2022 517 part-00005-a3885270-...-c000.snappy.parquet
10/17/2022 513 part-00006-a3885270-...-c000.snappy.parquet
10/17/2022 513 part-00007-a3885270-...-c000.snappy.parquet
10/17/2022 517 part-00008-a3885270-...-c000.snappy.parquet
10/17/2022 513 part-00009-a3885270-...-c000.snappy.parquet
10/17/2022 513 part-00010-a3885270-...-c000.snappy.parquet
10/17/2022 517 part-00011-a3885270-...-c000.snappy.parquet
一个刚触摸大数据领域的开发者此时或许会感到有些震动。咱们只写入了100个数字,为什么终究会得到12个Parquet文件呢?这需求进行一些具体阐明。
首要,咱们在写入操作中指定的文件名实践上是一个目录名,而不是文件名。正如您所见,目录/parquetData 包括了12个Parquet文件。
当咱们检查 .parquet 文件时,或许会看到咱们有12个文件。Spark是一个高度并行的核算环境,体系试图让Spark集群中的每个CPU中心坚持繁忙。在咱们的情况下,咱们在本地机器上运转,这意味着咱们的集群中只要一台机器。当咱们检查体系的信息时,咱们能够看到咱们有12个中心。
当咱们检查写入的.parquet文件的数量时,咱们会发现咱们有12个文件,这与咱们集群中的中心数相等。这是Spark在这种情况下的默许行为。文件的数量将等于可用中心的数量。假设咱们向代码中增加以下句子:
data = spark.range(0, 100)
data.write.format("parquet")
.mode("overwrite")
.save('/book/chapter02/parquetData')
print(f"The number of partitions is: {data.rdd.getNumPartitions()}")
从输出中咱们能够看到,的确有12个文件:
The number of partitions is: 12
虽然在仅写入100个数字的情况下或许看起来有些过于杂乱,但能够幻想在读取或写入十分大的文件时,将文件拆分并并行处理能够明显进步功用。
在输出中看到的 .crc 文件是循环冗余校验文件。Spark运用它们来保证数据没有被损坏。这些文件一般十分小,因而与它们供给的实用性相比,它们的开支十分小。虽然有一种办法能够关闭生成这些文件,但咱们不建议这样做,因为它们的好处远远超越开支。
输出中的终究两个文件是 _SUCCESS 和 _SUCCESS.crc 文件。Spark运用这些文件来供给一种承认一切分区都已正确写入的办法。
创立Delta表
到目前为止,咱们一直在运用Parquet文件。现在,让咱们将前一节中的榜首个示例保存为Delta Lake格局,而不是Parquet(代码:/chapter02/writeDeltaFile.py)。咱们只需求将代码中的Parquet格局替换为Delta格局,如下所示:
data = spark.range(0, 100)
data.write
.format("delta")
.mode("overwrite")
.save('/book/chapter02/deltaData')
print(f"The number of filesis: {data.rdd.getNumPartitions()}")
咱们得到相同数量的分区:
The number of files is: 12
当咱们检查输出时,咱们会看到增加了 _delta_log 文件:
Directory of C:bookchapter02deltaData
10/17/2022 16 .part-00000-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00001-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00002-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00003-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00004-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00005-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00006-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00007-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00008-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00009-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00010-...-c000.snappy.parquet.crc
10/17/2022 16 .part-00011-...-c000.snappy.parquet.crc
10/17/2022 524 part-00000-...-c000.snappy.parquet
10/17/2022 519 part-00001-...-c000.snappy.parquet
10/17/2022 523 part-00002-...-c000.snappy.parquet
10/17/2022 519 part-00003-...-c000.snappy.parquet
10/17/2022 519 part-00004-...-c000.snappy.parquet
10/17/2022 522 part-00005-...-c000.snappy.parquet
10/17/2022 519 part-00006-...-c000.snappy.parquet
10/17/2022 519 part-00007-...-c000.snappy.parquet
10/17/2022 523 part-00008-...-c000.snappy.parquet
10/17/2022 519 part-00009-...-c000.snappy.parquet
10/17/2022 519 part-00010-...-c000.snappy.parquet
10/17/2022 523 part-00011-...-c000.snappy.parquet
10/17/2022 <DIR> _delta_log
24 File(s) 6,440 bytes
_delta_log 文件包括了对数据履行的每个操作的业务日志。
Delta Lake业务日志
Delta Lake业务日志(也称为DeltaLog)是自Delta Lake表创立以来记载在表上履行的每个业务的次序记载。它对Delta Lake的功用十分要害,因为它是其重要特性的中心,包括ACID业务、可扩展的元数据处理和时刻游览功用。
业务日志的主要方针是答应多个读取器和写入器同时在给定版别的数据文件上操作,并为履行引擎供给额定的信息,如数据越过索引,以进行更高功用的操作。Delta Lake业务日志始终向用户显现数据的共同视图,并充任单一的本相来源。它是盯梢用户对Delta表所做的一切更改的中心存储库。
当Delta表读取器首次读取Delta表或在前次读取后已被修改的敞开文件上运转新查询时,Delta Lake会检查业务日志以获取表的最新版别。这保证了用户文件的版别始终与最近查询时的主记载同步,而且用户不能对文件进行不合和冲突的更改。
业务日志怎么完成原子性
在榜首章中,咱们了解到原子性保证了对文件履行的一切操作(例如,INSERT、UPDATE、DELETE或MERGE)要么彻底成功,要么彻底不成功。假如没有原子性,任何硬件毛病或软件过错都或许导致数据文件部分写入,然后导致数据损坏或至少是无效的数据。
业务日志是Delta Lake供给原子性保证的机制。业务日志还负责处理元数据、时刻游览以及大型表格数据集的大幅进步元数据操作速度。
业务日志是自创立以来对Delta表格履行的每个业务的有序记载。它充任单一的本相来源,并盯梢对表格所做的一切更改。业务日志运用户能够揣度他们的数据,并信任其完好性和质量。简略的规则是,假如一个操作没有记载在业务日志中,那么它就从未发生过。在接下来的章节中,咱们将经过几个示例来阐明这些准则。
将业务拆分成原子提交
每逢您履行一组操作来修改表格或存储文件(例如插入、更新、删去或兼并操作),Delta Lake 将将该操作分解为一系列原子的、离散的过程,由表2-1中显现的一个或多个操作组成。
这些操作以有序的原子单位(称为提交)的方式记载在业务日志条目(*.json)中。这相似于Git源代码控制体系盯梢更改的办法,以原子提交的方式。这也意味着您能够重放业务日志中的每个提交以取得文件的当时状况。 例如,假如用户创立一个业务来向表中增加新列,然后增加数据到该列,Delta Lake会将此业务分解为其组成操作部分,并一旦业务完成,将它们作为以下提交增加到业务日志中:
- 更新元数据:更改方式以包括新列。
- 增加文件:为每个新增加的文件。
文件等级的业务日志
当您写入一个Delta表时,该文件的业务日志会自动创立在 _delta_log 子目录中。跟着您持续对Delta表进行更改,这些更改将自动记载为有序的原子提交在业务日志中。每个提交都以JSON文件的方式写入,从0000000000000000000.json开端。假如您对文件进行额定的更改,Delta Lake将按升序生成附加的JSON文件,因而下一个提交将被写入为0000000000000000001.json,下一个为0000000000000000002.json,依此类推。 在本章的其余部分,出于可读性的意图,咱们将运用业务日志条意图缩写方式。而不是显现长达19位的数字,咱们将运用长达5位的缩写方式(因而您将运用00001.json而不是更长的记法)。 此外,咱们将缩短Parquet文件的称号。这些称号一般如下所示: part-00007-71c70d7f-c7a8-4a8c-8c29-57300cfd929b-c000.snappy.parquet 为了演示和解说,咱们将将这样的称号缩写为part-00007.parquet,省掉GUID和snappy.parquet部分。 在咱们的示例可视化中,咱们将以动作称号和受影响的数据文件称号来可视化每个业务条目;例如,在图2-3中,咱们在一个单一的业务文件中有一个删去(文件)操作和另一个增加(文件)操作。
对同一文件进行屡次写入
在本节中,咱们将运用一组图表,具体描述每个代码过程。关于每个过程,咱们显现以下信息:
- 实践的代码片段显现在第二列。
- 在代码片段旁边,咱们显现作为代码片段履行成果的Parquet数据文件。
- 在终究一列中,咱们显现业务日志条意图JSON文件。咱们为每个业务日志条目显现操作和受影响的Parquet数据文件称号。
关于这个榜首个示例,您将运用图书存储库中的chapter02/MultipleWriteOperations.py来展现对同一文件进行屡次写入。
以下是图2-4中不同过程的逐步阐明: 首要,将新的Delta表写入途径。一个Parquet文件已写入到输出途径(part-00000.parquet)。榜首个业务日志条目(00000.json)已在_delta_log目录中创立。因为这是文件的榜首个业务日志条目,记载了一个元数据操作和一个增加文件操作,指示增加了一个分区文件。 接下来,咱们向表中追加数据。咱们能够看到已写入一个新的Parquet文件(part-00001.parquet),并在业务日志中创立了一个附加的条目(00001.json)。与榜首步相似,该条目包括一个增加文件操作,因为咱们增加了一个新文件。 咱们再次追加更多数据。再次写入一个新的数据文件(part-00002.parquet),并向业务日志中增加了一个新的业务日志文件(00002.json),包括一个增加文件操作。
请留意,每个业务日志条目还将包括一个提交信息操作,其间包括了业务的审计信息。出于可读性的意图,咱们在图表中省掉了提交信息日志条目。
写操作的操作次序十分重要。关于每个写操作,数据文件始终首要被写入,只要在该操作成功完成时,才会向_delta_log文件夹增加一个业务日志文件。只要当业务日志条目成功写入时,业务才被视为已完成。
读取Delta表的最新版别
当体系读取Delta表时,它将遍历业务日志以“编译”表格的当时状况。读取文件时的事情次序如下: 首要读取业务日志文件。 依据日志文件的信息,读取数据文件。 接下来,咱们将描述先前示例(multipleWriteOperations.py)中写入的Delta表的次序。Delta将读取一切日志文件(00000.json、00001.json和00002.json)。它将依据日志信息读取三个数据文件,如图2-5所示。
请留意,操作的次序还意味着在业务日志中不再引证的数据文件或许存在。实践上,在更新或删去的情况下,这是一个常见的情况。Delta Lake不会删去这些数据文件,因为假如用户运用Delta Lake的时刻游览功用(在第6章中介绍),这些文件或许会再次被需求。您能够运用VACUUM指令来删去旧的、过期的数据文件(也在第6章中介绍)。
写操作中的毛病场景
接下来,让咱们看看假如写操作失败会发生什么。在前面的写入场景中,假设图2-4中的第3步写入操作在半途失败。或许现已写入了部分Parquet文件,但业务日志条目00002.json没有写入。这将导致图2-6中所示的情形。
正如您在图2-6中所看到的,终究一个业务文件丢失。依据之前指定的读取次序,Delta Lake将读取榜首个和第二个JSON业务文件,以及相应的part-00000和part-00001 Parquet文件。Delta Lake的读取器将不会读取不共同的数据;它将经过前两个业务日志文件读取一个共同的视图。
更新场景
终究一个场景包括在chapter02/UpdateOperation.py代码库中。为了坚持简略,咱们有一个包括患者信息的小型Delta表。咱们只盯梢每位患者的患者ID和患者名字。在这个用例中,咱们创立一个包括四名患者的Delta表,每个文件中有两名患者。接下来,咱们从别的两名患者增加数据。终究,咱们更新了榜首名患者的名字。正如您将看到的,这次更新产生了比预期更大的影响。完好的更新场景如图2-7所示。
在这个示例中,咱们履行以下过程:
- 榜首个代码片段创立了一个Spark DataFrame,其间包括四名患者的患者ID和名字。咱们运用
.coalesce(2)
将DataFrame写入一个Delta表,强制数据写入两个文件。成果,咱们写入了两个文件。一旦part-00000.parquet和part-00001.parquet文件被写入,就创立了一个业务日志条目(00000.json)。请留意,业务日志条目包括两个增加文件操作,指示增加了part-00000.parquet和part-00001.parquet文件。 - 接下来的代码片段追加了别的两名患者(P5和P6)的数据。这导致了part-00002.parquet文件的创立。同样,一旦文件被写入,就会写入业务日志条目(00001.json),业务就完成了。再次请留意,业务日志文件有一个增加文件操作,指示增加了一个文件(part-00002.parquet)。
- 代码履行一个更新操作。在这种情况下,咱们想要将患者ID为1的患者的名字从P1更新为P11。目前,患者ID为1的记载存在于part-0中。为履行更新,读取part-0并运用映射操作符来更新任何匹配患者ID为1的P1记载为P11。一个新文件被写入为part-3。终究,Delta Lake写入了业务日志条目(00002.json)。请留意,它写入了一个移除文件操作,表明移除了part-0文件,以及一个增加操作,表明增加了part-3文件。这是因为来自part-0的数据被重写到part-3中,而一切已修改的行(以及未修改的行)都被增加到了part-3中,使part-0文件变得过期。 请留意,Delta Lake不会删去part-0文件,因为用户或许期望经过期刻游览回到过去,而在这种情况下,文件是必需的。VACUUM指令能够整理未运用的文件,如此操作具体介绍在第6章中。
现在咱们现已看到了在更新期间数据是怎么写入的,让咱们看看在读取中怎么确定要读取的内容,如图2-8所示。
读取将按以下办法进行:
- 首要读取榜首个业务日志条目(00000.json)。该条目告知Delta Lake包括part-0和part-1文件。
- 接下来读取下一个条目(00001.json),告知Delta Lake包括part-2文件。
- 终究读取终究一个条目(00002.json),通知读取器移除part-0文件并包括part-3。
成果,读取器终究会读取part-1、part-2和part-3,得到了图2-8中所示的正确数据。
扩展大规模元数据
现在咱们现已看到业务日志怎么记载每个操作,咱们能够有许多十分大的文件,其间包括数千个业务日志条目,针对单个Parquet文件。Delta Lake怎么扩展其元数据处理,而不需求读取数千个小文件,这将对Spark的读取功用产生负面影响?Spark在读取大文件时作用最佳,那么咱们该怎么解决这个问题?
一旦Delta Lake写入程序将提交记载到业务日志,它将在“_delta_log”文件夹中以Parquet格局保存一个检查点文件。Delta Lake写入程序将在每10次提交后持续生成一个新的检查点。检查点文件保存了表在特定时刻点的整个状况。留意,“状况”指的是不同的操作,而不是文件的实践内容。因而,它将包括增加文件、删去文件、更新元数据、提交信息等操作,以及一切上下文信息。它将以原生Parquet格局保存这个列表。这将答应Spark快速读取检查点。这为Spark读取器供给了一个“快捷办法”,能够彻底重现表的状况,避免从头处理数千个小的JSON文件,这或许效率低下。
检查点文件示例
以下是一个示例(如图2-9所示),咱们履行屡次提交,成果生成了一个检查点文件。该示例运用了书本存储库中的代码文件chap02/TransactionLogCheckPointExample.py。
这个示例包括以下过程:
榜首段代码创立一个规范的Spark DataFrame,其间包括多个患者的数据。请留意,咱们对DataFrame使用了coalesce(1)业务,将数据强制放入一个分区。
接下来,咱们将DataFrame以Delta Lake格局写入存储文件。咱们验证只要一个part-0001.parquet文件被写入。咱们还看到在_delta_log目录中创立了一个单个业务日志条目(00000.json)。该目录条目包括了part-00001.parquet文件的增加文件操作。
在接下来的过程中,咱们设置一个循环,循环次数为range(0, 9),将创立一个新的患者行,然后从该元组创立一个DataFrame,并将DataFrame写入存储文件。因为循环了九次,咱们创立了九个额定的Parquet文件,从part-00001.parquet到part-00009.parquet。咱们还看到了九个额定的业务日志条目,从00001.json到00009.json。
在第3步中,咱们创立了一个额定的患者元组,将其转换为DataFrame,并将其写入Delta表。这创立了一个额定的数据文件(part-00010.parquet)。业务日志中有一个规范的日志条目(00010.json),包括了part-00010.parquet文件的增加文件操作。但风趣的事实是它还创立了一个000010.checkpoint.parquet文件。这便是前面说到的检查点。每10次提交会生成一个检查点。这个Parquet文件以原生Parquet格局包括了在提交时表的整个状况。
在终究一步,代码生成了两次提交,创立了part-00011.parquet和part-00012.parquet,以及两个新的日志条目,这些条目指向这些文件。
假如Delta Lake需求从头创立表的状况,它将简略地读取检查点文件(000010.checkpoint.parquet),然后从头使用两个额定的日志条目(00011.json和00012.json)。
显现检查点文件
既然咱们现已生成了checkpoint.parquet文件,让咱们运用/chapter02/readCheckPointFile.py Python文件来检查它的内容:
# Set your output path for your Delta table
DATALAKE_PATH = "/book/chapter02/transactionLogCheckPointExample"
CHECKPOINT_PATH = "/_delta_log/00000000000000000010.checkpoint.parquet"
# Read the checkpoint.parquet file
checkpoint_df =
spark
.read
.format("parquet")
.load(f"{DATALAKE_PATH}{CHECKPOINT_PATH}")
# Display the checkpoint dataframe
checkpoint_df.show()
请留意,咱们在这里进行的是Parquet格局的读取,因为检查点文件的确是以Parquet格局存储的,而不是Delta格局。 checkpoint_df DataFrame的内容如下所示:
---- -------------------- ------ -------------------- --------
| txn| add|remove| metaData|protocol|
---- -------------------- ------ -------------------- --------
|null|{part-00000-f7d9f...| null| null| null|
|null|{part-00000-a65e0...| null| null| null|
|null|{part-00000-4c3ea...| null| null| null|
|null|{part-00000-8eb1f...| null| null| null|
|null|{part-00000-2e143...| null| null| null|
|null|{part-00000-d1d13...| null| null| null|
|null|{part-00000-650bf...| null| null| null|
|null|{part-00000-ea06e...| null| null| null|
|null|{part-00000-79258...| null| null| null|
|null|{part-00000-23558...| null| null| null|
|null| null| null| null| {1, 2}|
|null| null| null|{376ce2d6-11b1-46...| null|
|null|{part-00000-eb29a...| null| null| null|
---- -------------------- ------ -------------------- --------
正如您所见,检查点文件包括了不同操作(增加、移除、元数据和协议)的列。咱们看到了不同Parquet数据文件的增加文件操作,以及当咱们创立Delta表时的更新元数据操作,以及初始Delta表写入导致的协议更改操作。 请留意,DataFrame.show()不会按次序显现DataFrame记载。协议更改和更新元数据记载总是检查点文件中的榜首条记载,然后是不同的增加文件操作。
总结
在咱们开端探究Delta Lake的旅程时,一切都始于初始设置。本章介绍了如安在本地核算机上运用PySpark和Spark Scala shell设置Delta Lake,同时涵盖了必要的库和软件包,以使您能够运转带有Delta Lake扩展的PySpark程序。您还能够运用像Databricks这样的依据云的东西来简化此设置过程,以开发、运转和共享依据Spark的使用程序,例如Delta Lake。
在了解怎么发动Delta Lake之后,咱们开端学习Delta Lake的根本组件,这些组件无疑支撑了本书中咱们将评论的大多数中心功用。经过增加检查点文件以完成可弹性的元数据和将规范Parquet文件增加到业务日志以支撑ACID业务,Delta Lake具有了支撑可靠性和可扩展性的要害元素。既然咱们现已建立了这些根本组件,您将鄙人一章中学习有关Delta表上的根本操作。