在生产中,无论是通过SQL语句或者Scala/Java等代码的方式使用Spark SQL处理数据,在Spark SQL写数据时,往往会遇到生成的小文件过多的问题,而管理这些大量的小文件,是一件非常头疼的事情。
大量的小文件会影响Hadoop集群管理或者Spark在处理数据时的稳定性:
-
Spark SQL写Hive或者直接写入HDFS,过多的小文件会对NameNode内存管理等产生巨大的压力,会影响整个集群的稳定运行。
-
容易导致task数过多,如果超过参数spark.driver.maxResultSize的配置(默认1g),会抛出类似如下的异常,影响任务的处理。
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
当然可以通过调大spark.driver.maxResultSize的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。
下面通过一个例子,Spark SQL写数据时,导致产生分区数"剧增"的典型场景,通过分区数"剧增",以及Spark中task数和分区数的关系等,来倒推小文件过多的可能原因(这里的分区数是指生成的DataSet/RDD的分区数,不是Hive分区表的分区概念):
-
对表test_tab进行写入操作
-
t1的分区数是100,t2的分区数是200,union all后生成的tmp分区数是300
-
test_tab产生的小文件数基本也在300左右
select * from t1 union all select * from t2 as tmp;
insert overwrite table test_tab select * from tmp;
1)执行上述insert操作时的分区并行度,主要受tmp的分区数(对应一个DataSet)影响,
2)tmp的分区数主要受t1、t2以及union all的影响
3)暂且不考虑t1或t2是物理表还是经过其他处理生成的临时表,它们的分区数是确定的,这里主要看经过union all处理后,生成的tmp的分区数和t1、t2的分区数有何关系?
4)Spark SQL语句中的union all对应到DataSet中即为unionAll算子,底层调用union算子
Spark RDD中的union算子对union产生的新的RDD的分区数是如何受被union的多个RDD的影响的,这里直接给出结论:
RDD在调用union算子时,最终生成的RDD分区数分两种情况:
- union的RDD分区器已定义并且它们的分区器相同
多个符RDD具有相同额分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的 - 不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和。
同样的这种机制也可以套用到Spark SQL中的DataSet上,那么就很好解释了tmp的分区数为什么等于t1和t2的分区数的和。
最后,Spark中一个task处理一个分区从而也会影响最终生成的文件数。
当然上述只是以Spark SQL中的一个场景阐述了小文件产生过多的原因之一(分区数过多)。在数仓建设中,产生小文件过多的原因有很多种,比如:
-
流式处理中,每个批次的处理执行保存操作也会产生很多小文件
-
为了解决数据更新问题,同一份数据保存了不同的几个状态,也容易导致文件数过多
那么如何解决这种小文件的问题呢?
- 通过repartition或coalesce算子控制最后的DataSet的分区数
注意repartition和coalesce的区别。 - 将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL需要注意这种方式对Spark的版本有要求,建议在Spark2.4.X及以上版本使用,示例:
INSERT ... SELECT ...
INSERT ... SELECT ...
Coalesce Hint减少了分区数,它仅合并分区 ,因此最大程度地减少了数据移动,但须注意内存不足容易OOM。
Repartition Hint可以增加或减少分区数量,它执行数据的完全shuffle,并确保数据平均分配。
repartition增加了一个新的stage,因此它不会影响现有阶段的并行性;相反,coalesce会影响现有阶段的并行性,因为它不会添加新stage。
该写法还支持多个插入查询和命名子查询。
- 小文件定期合并
可以定时通过异步的方式针对Hive分区表的每一个分区中的小文件进行合并操作。
如何在纯写SQL的场景下,如何用Spark SQL做数据导入时候,控制小文件的数量。
-
对于原始数据进行按照分区字段进行shuffle,可以规避小文件问题。但有可能引入数据倾斜的问题;
-
可以通过distribute by ss_sold_date_sk, cast(rand() * N as int),N值可以在文件数量和倾斜度之间做权衡;
-
知道倾斜键的情况下,可以将原始数据分成几个部分处理,不倾斜的按照分区键shuffle,倾斜部分可以按照rand函数来shuffle;
-
对于Spark 2.4 以上版本的用户,也可以使用HINT 详情,链接如下:
https://issues.apache.org/jira/browse/SPARK-24940
-
对于Spark 3.0 以上版本的用户,可以使用自适应查询(AQE)功能,设置spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled为true,Spark就会在计算过程中自动帮助用户合并小文件,更加方便和智能。
spark.sql.adaptive.enabled = true
spark.sql.adaptive.minNumPostShufflePartitions = 1
spark.sql.adaptive.maxNumPostShufflePartitions = 500
spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 67108864
spark.sql.adaptive.shuffle.targetPostShuffleRowCount = 20000000
spark.sql.auto.repartition = true
spark.sql.adaptive.join.enabled = true
spark.sql.adaptive.skewJoin.enabled = true
spark.shuffle.consolidateFiles = true
spark.shuffle.service.enabled = true
spark.sql.adaptive.allowAdditionalShuffle = true
必须要出发shuffle,如果任务中只有map task,需要通过group by 或者distribute 触发shuffle的执行,只有触发shuffle,才能使用adaptive解决小文件问题。
如何避免Spark SQL做数据导入时产生大量小文件
SparkSQL自适应执行
参考文章:
hdfs 小文件如何处理
spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案
spark sql合并小文件_Spark SQL 小文件问题产生原因分析以及处理方案
使用Spark sql 合并 Flink 写Hive表的小文件
由于spark的写文件方式,会导致产生很多小文件,会对NameNode造成压力,读写性能变差,为了解决这种小文件问题,spark新的版本(笔者使用2.4.0.cloudera2版本)中支持了动态规划shuffle过程,需要配置spark.sql.adaptive.enabled属性。从结果可以看到只有一个文件,这是由于动态规划的作用,在写文件的时候只启动了一个任务。与Hive不同的是,Spark在执行shuffle过程的时候,会为每一个shuffle的key启动一个任务来写数据,上例中的key。...
由于项目中开发用到sparksql ,将一个大表的数据查询后插入到另一种表中,此时数据令也不是太大,
但是产生了200多个小文件,占用namenode资源,为了改善该情况使用,
将hive中的表加载为一个df,然后重新分区+缓存+注册为临时表,在进行查询,插入操作,此时文件为20个
关键代码如下:
...........
val aDF =
hiveContext.
@羲凡——只为了更好的活着
SparkSql 控制输出文件数量且大小均匀(distribute by rand())
Q:Spark如何控制文件你输出数量?
A:这个简单,用 coalesce或者repartition,num=(1.0*(df.count())/7000000).ceil.toInt
Q:Spark让输出文件大小均匀?
A:在sparksql的查询最后加上distribute b...
Hive自身和Spark都提供了对Hive的SQL支持,用SQL的交互方式操作Hive底层的HDFS文件,两种方式在写文件的时候有一些区别:1. Hive 1.1 without shuffleHive在通过SQL写文件是通过MapReduce任务完成的,如下面这个例子:hive>insertintotabletemp.czc_hive_test_write...
数据倾斜与小文件问题。在pyspark 中直接以 spark.sql(insert into tableNew ..partition(...).select ...from tableOld)的方式写入数据时,默认未开启动态分区会报错。开启动态分区后容易造成小文件过多从而磁盘报警。
spark sql执行insert overwrite table时,写到新表或者新分区的文件个数,有可能是200个,也有可能是任意个,为什么会有这种差别?
首先看一下spark sql执行insert overwrite table流程:
1 创建临时目录,比如
.hive-staging_hive_2018-06-23_00-39-39_825_31228971394415353...
DataFrame输出结果保存为文件时,尤其是根据某个条件分区时,可以控制输出文件的个数,从而减少小文件的个数DataFrame..coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("分区条件列名").save("路径")这里coalesce指定输出文件个数...
Spark SQL小文件
小文件是指文件大小显著小于hdfs block块大小的的文件。过于繁多的小文件会给HDFS带来很严重的性能瓶颈,对任务的稳定和集群的维护会带来极大的挑战。
由于Spark本身并不支持小文件合并功能,小文件问题日益突出。
Spark为什么会产生小文件
Spark生成的文件数量直接取决于RDD里partition的数量和表分区数量。注意这里的两个分区概念并不相同,RDD的分区与任务并行度相关,而表分区则是Hive的分区数目。生成的文件数目一般是RDD分区数和表分区的乘积。因此,当任务并
ssc.textFile("/user/ods/coursepay.log")
.filter(item => {
val obj = ParseJsonData.getJsonData(item)
obj.isInstanceOf[JSONObject]
在使用SparkSql进行项目开发的过程,往往会碰到一个比较头疼的问题,由于SparkSql的默认并行度是200,当sql中包含有join、group by相关的shuffle操作时,会产生很多小文件;从集群优化的角度来说,太多的小文件对NameNode的压力比较大,而且太多的小文件对后续使用该表进行计算时会启动很多不必要的maptask,任务耗时高。因此,需要对小文件问题进行优化。在Datase...
背景:在数仓任务中,经常要解决小文件的问题。有时间为了解决小文件问题,我们把spark.sql.shuffle.partitions这个参数调整的很小,但是随着时间的推移,数据量越来越大,当初设置的参数就不合适了,那有没有一个可以自我伸缩的参数呢?