1. 起因
前段时间(嗯,几个月前。。),咱们的智能归因系统监测到单个hive测验表磁盘空间占用超过3TB,并发出预警。进一步检查发现该表的存储格局为text。
众所周知,当时存储空间作用最优的数据格局便是orc和parquet(而且还可以进行压缩),因而除非特殊情况,text格局不应该作为默许表格局。
text格局hive表示例:
这就引发了一个新的问题,不论是trino(presto)的ctas,仍是spark的saveAsTable,默许建表格局都不是text(修改过默许参数)。而其他建表入口,如hive cli,则不对普通用户开放命令行履行权限。
那么,这个text格局的表是怎样来的呢?
所以,溯源开端了。经过咱们的表级血缘数据,获取到了写入这个表的使命id,从而获取到了使命脚本,经过检查代码,才发现之前遗失的一个写表方法。没错,便是它:spark sql的ctas。
2. 问题确认
已然大概确认了问题的原因,那么就要承认能否经过spark sql复现。(当然,也很简略)
spark.sql("""create table test.test_spark_sql_default_table_format_tmp
as
select xxx""")
经过hive检查刚刚创立的表的格局,发现确实是text格局,可以稳定复现。
所以可以确认问题就在spark sql这儿。
2.1 进一步排查
第一个问题,便是要承认为什么spark dsl的默许参数为什么不对spark sql起作用,经过查阅spark官方文档以及spark源码(v2.4.8),发现了更“风趣”的现象。
spark sql 和 spark dsl装备并不是一致的。
以建表格局装备参数为例:
spark sql : hive.default.fileformat(至少表面看起来是这样)
源码见:org/apache/spark/sql/internal/HiveSerDe.scala getDefaultStorage
spark dsl : spark.sql.sources.default
装备页面见:sql-data-sources-load-save-functions
源码见:org/apache/spark/sql/internal/SQLConf.scala DEFAULT_DATA_SOURCE_NAME
3. 测验修复
已然确认了问题(spark sql 默许建表格局没有指定为orc),那么就需要针对这个问题进行修复了。
经过检查官方文档和源代码,可以发现spark sql支撑经过hint的方法,来指定创立表时的存储格局等参数:
文档链接:sql-data-sources-hive-tables
源码:
org.apache.spark.sql.execution.command.DDLParserSuite
test("create hive table - table file format") {
val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile",
"sequencefile", "rcfile", "textfile")
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab STORED AS $s"
val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s)
assert(hiveSerde.isDefined)
assert(ct.tableDesc.storage.serde ==
hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
}
}
这个确实也不失为一种解决方案,但是对于sql编写人员无疑提出了更多的要求。
所以从易用性的角度而言,咱们更需要可以指定spark sql 默许建表格局。
因而可以说问题并未解决。还记得上面那个HiveSerDe吗,里边还有一个方法叫做getDefaultStorage,用于从装备信息中获取hive的默许建表格局(生成逻辑计划的时分):
def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = sourceToSerDe(defaultStorageType)
CatalogStorageFormat.empty.copy(
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
serde = defaultHiveSerde.flatMap(_.serde)
.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
}
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
3.1 新的起色?
如同发现了元凶巨恶?
咱们测验获取下这个参数:
结果是undefined,也便是没有在spark conf中没有指定过默许建表格局,那么咱们试试在已发动的spark会话中设置下这个参数:
spark.sql("set hive.default.fileformat = 'orc'")
再履行下spark sql ctas句子。公然,这次的hive测验表的格局成功变为orc了。
看起来,好像仅仅由于咱们没有在spark conf装备过hive.default.fileformat这个参数?
由于spark支撑经过–conf的方法动态装备参数,所以可以先经过这个方法测验下:
spark --conf hive.default.fileformat=orc
在新的spark 会话中,咱们再次履行ctas句子。但是结果却很让人惊讶,这次创立的测验表仍然是text格局?!
很古怪呀,不是现已传递了这个参数了吗?为什么没有收效呢?
3.2 真实的问题
咱们再看看这个参数真的收效了吗?
公然,并不是等待中的orc格局,仍是undefined。
这个就很让人费解了,明明在源码中这个参数的key便是hive.default.fileformat呀?
这一步让我开端置疑,–conf设置的参数,真的传递进去了吗?
基于日常排查问题的经验,我猜测spark发动脚本也支撑verbose参数: 履行下spark –help。公然,一个等待中的参数出现在屏幕中。
那么开启verbose形式试试,看下究竟是什么原因。
spark --conf hive.default.fileformat=orc --verbose
Ignoring non-Spark config property: hive.default.fileformat
spark日志显示,咱们传递了一个非spark的装备参数,然后被过滤掉了。
那么就只能依据日志信息,看下对应的spark源码,确认究竟是什么逻辑了:
org.apache.spark.deploy.SparkSubmitArguments.ignoreNonSparkProperties
private def ignoreNonSparkProperties(): Unit = {
sparkProperties.keys.foreach { k =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
logWarning(s"Ignoring non-Spark config property: $k")
}
}
}
emmm,显而易见。spark要求传递的参数必须以spark.为前缀,不契合这个要求的装备参数,就会被过滤掉。
啊哈,这一切的问题,竟然是由于装备参数命名风格不一致导致的,这么简略?
已然找到了真实的问题,接下来就so easy了。修复下获取参数的key这部分的源码,然后从头编译、打包就可以了。
def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
val HiveStorageType = conf.getConfString("spark.hive.default.fileformat", "textfile")
var defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
if (HiveStorageType != defaultStorageType) {
defaultStorageType = HiveStorageType
}
val defaultHiveSerde = sourceToSerDe(defaultStorageType)
CatalogStorageFormat.empty.copy(
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
serde = defaultHiveSerde.flatMap(_.serde)
.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
}
这儿为了不破坏原有参数的装备作用,只新增了一个契合spark命名标准的参数,仅用来满足装备默许值的需求。
4. 总结
这次协助渠道的小伙伴解决了一个不大不小的“坑”,必定程度上提升了集群的稳定性和写数据的功率。
so,遇到古怪的问题不要容易放弃,再研讨下,或许问题并不复杂,而你可以解决掉它呢?
5. 跋文
后来也看了下spark3.x,官方现已解决了这个问题,建表格局默许为orc了。(由于hadoop.xml的装备参数优先级会更高)