相关文章推荐

在生产中,无论是通过SQL语句或者Scala/Java等代码的方式使用Spark SQL处理数据,在Spark SQL写数据时,往往会遇到生成的小文件过多的问题,而管理这些大量的小文件,是一件非常头疼的事情。
大量的小文件会影响Hadoop集群管理或者Spark在处理数据时的稳定性:

  1. Spark SQL写Hive或者直接写入HDFS,过多的小文件会对NameNode内存管理等产生巨大的压力,会影响整个集群的稳定运行。
  2. 容易导致task数过多,如果超过参数spark.driver.maxResultSize的配置(默认1g),会抛出类似如下的异常,影响任务的处理。

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

当然可以通过调大spark.driver.maxResultSize的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。

下面通过一个例子,Spark SQL写数据时,导致产生分区数"剧增"的典型场景,通过分区数"剧增",以及Spark中task数和分区数的关系等,来倒推小文件过多的可能原因(这里的分区数是指生成的DataSet/RDD的分区数,不是Hive分区表的分区概念):

  1. 对表test_tab进行写入操作
  2. t1的分区数是100,t2的分区数是200,union all后生成的tmp分区数是300
  3. test_tab产生的小文件数基本也在300左右
select * from t1 union all select * from t2 as tmp;
insert overwrite table test_tab select * from tmp;

1)执行上述insert操作时的分区并行度,主要受tmp的分区数(对应一个DataSet)影响,

2)tmp的分区数主要受t1、t2以及union all的影响

3)暂且不考虑t1或t2是物理表还是经过其他处理生成的临时表,它们的分区数是确定的,这里主要看经过union all处理后,生成的tmp的分区数和t1、t2的分区数有何关系?

4)Spark SQL语句中的union all对应到DataSet中即为unionAll算子,底层调用union算子

Spark RDD中的union算子对union产生的新的RDD的分区数是如何受被union的多个RDD的影响的,这里直接给出结论:

RDD在调用union算子时,最终生成的RDD分区数分两种情况:

  1. union的RDD分区器已定义并且它们的分区器相同
    多个符RDD具有相同额分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的
  2. 不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和。

同样的这种机制也可以套用到Spark SQL中的DataSet上,那么就很好解释了tmp的分区数为什么等于t1和t2的分区数的和。

最后,Spark中一个task处理一个分区从而也会影响最终生成的文件数。

当然上述只是以Spark SQL中的一个场景阐述了小文件产生过多的原因之一(分区数过多)。在数仓建设中,产生小文件过多的原因有很多种,比如:

  1. 流式处理中,每个批次的处理执行保存操作也会产生很多小文件

  2. 为了解决数据更新问题,同一份数据保存了不同的几个状态,也容易导致文件数过多

那么如何解决这种小文件的问题呢?

  1. 通过repartition或coalesce算子控制最后的DataSet的分区数
    注意repartition和coalesce的区别。
  2. 将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL需要注意这种方式对Spark的版本有要求,建议在Spark2.4.X及以上版本使用,示例:
--提示名称不区分大小写
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...

Coalesce Hint减少了分区数,它仅合并分区 ,因此最大程度地减少了数据移动,但须注意内存不足容易OOM。
Repartition Hint可以增加或减少分区数量,它执行数据的完全shuffle,并确保数据平均分配。
repartition增加了一个新的stage,因此它不会影响现有阶段的并行性;相反,coalesce会影响现有阶段的并行性,因为它不会添加新stage。
该写法还支持多个插入查询和命名子查询。

  1. 小文件定期合并
    可以定时通过异步的方式针对Hive分区表的每一个分区中的小文件进行合并操作。

如何在纯写SQL的场景下,如何用Spark SQL做数据导入时候,控制小文件的数量。

  • 对于原始数据进行按照分区字段进行shuffle,可以规避小文件问题。但有可能引入数据倾斜的问题;

  • 可以通过distribute by ss_sold_date_sk, cast(rand() * N as int),N值可以在文件数量和倾斜度之间做权衡;

  • 知道倾斜键的情况下,可以将原始数据分成几个部分处理,不倾斜的按照分区键shuffle,倾斜部分可以按照rand函数来shuffle;

  • 对于Spark 2.4 以上版本的用户,也可以使用HINT 详情,链接如下:
    https://issues.apache.org/jira/browse/SPARK-24940

  • 对于Spark 3.0 以上版本的用户,可以使用自适应查询(AQE)功能,设置spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled为true,Spark就会在计算过程中自动帮助用户合并小文件,更加方便和智能。

# 自适应执行框架的开关
spark.sql.adaptive.enabled = true
# reduce个数区间最小值
spark.sql.adaptive.minNumPostShufflePartitions = 1
# reduce个数区间最大值
spark.sql.adaptive.maxNumPostShufflePartitions = 500 
# 动态调整reduce个数的partition大小依据,如果设置为64 MB,则reduce阶段每个task最少处理64 MB的数据
spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 67108864
# 动态调整reduce个数的partition条数依据,如设置20000000则reduce阶段每个task最少处理20000000条的数据
spark.sql.adaptive.shuffle.targetPostShuffleRowCount = 20000000
spark.sql.auto.repartition = true
# 以下配置是针对join操作进行的性能优化
spark.sql.adaptive.join.enabled = true 
spark.sql.adaptive.skewJoin.enabled = true 
spark.shuffle.consolidateFiles = true 
spark.shuffle.service.enabled = true
spark.sql.adaptive.allowAdditionalShuffle = true

必须要出发shuffle,如果任务中只有map task,需要通过group by 或者distribute 触发shuffle的执行,只有触发shuffle,才能使用adaptive解决小文件问题。

如何避免Spark SQL做数据导入时产生大量小文件

SparkSQL自适应执行

参考文章:

hdfs 小文件如何处理

spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案

spark sql合并小文件_Spark SQL 小文件问题产生原因分析以及处理方案

使用Spark sql 合并 Flink 写Hive表的小文件

由于spark的写文件方式,会导致产生很多小文件,会对NameNode造成压力,读写性能变差,为了解决这种小文件问题,spark新的版本(笔者使用2.4.0.cloudera2版本)支持了动态规划shuffle过程,需要配置spark.sql.adaptive.enabled属性。从结果可以看到只有一个文件,这是由于动态规划的作用,在写文件的时候只启动了一个任务。与Hive不同的是,Spark在执行shuffle过程的时候,会为每一个shuffle的key启动一个任务来写数据,上例的key。... 由于项目开发用到sparksql ,将一个大表的数据查询后插入到另一种表,此时数据令也不是太大, 但是产生了200多个小文件,占用namenode资源,为了改善该情况使用, 将hive的表加载为一个df,然后重新分区+缓存+注册为临时表,在进行查询,插入操作,此时文件为20个 关键代码如下: ........... val aDF = hiveContext. @羲凡——只为了更好的活着 SparkSql 控制输出文件数量且大小均匀(distribute by rand()) Q:Spark如何控制文件输出数量? A:这个简单,用 coalesce或者repartition,num=(1.0*(df.count())/7000000).ceil.toInt Q:Spark输出文件大小均匀? A:在sparksql的查询最后加上distribute b... Hive自身和Spark都提供了对HiveSQL支持,用SQL的交互方式操作Hive底层的HDFS文件,两种方式在写文件的时候有一些区别:1. Hive 1.1 without shuffleHive在通过SQL文件是通过MapReduce任务完成的,如下面这个例子:hive>insertintotabletemp.czc_hive_test_write... 数据倾斜与小文件问题。在pyspark 直接以 spark.sql(insert into tableNew ..partition(...).select ...from tableOld)的方式写入数据时,默认未开启动态分区会报错。开启动态分区后容易造成小文件过多从而磁盘报警。 spark sql执行insert overwrite table时,写到新表或者新分区的文件个数,有可能是200个,也有可能是任意个,为什么会有这种差别? 首先看一下spark sql执行insert overwrite table流程: 1 创建临时目录,比如 .hive-staging_hive_2018-06-23_00-39-39_825_31228971394415353... DataFrame输出结果保存为文件时,尤其是根据某个条件分区时,可以控制输出文件的个数,从而减少小文件的个数DataFrame..coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("分区条件列名").save("路径")这里coalesce指定输出文件个数... Spark SQL文件文件是指文件大小显著小于hdfs block块大小的的文件。过于繁多的小文件会给HDFS带来很严重的性能瓶颈,对任务的稳定和集群的维护会带来极大的挑战。 由于Spark本身并不支持小文件合并功能,小文件问题日益突出。 Spark为什么会产生小文件 Spark生成的文件数量直接取决于RDD里partition的数量和表分区数量。注意这里的两个分区概念并不相同,RDD的分区与任务并行度相关,而表分区则是Hive的分区数目。生成的文件数目一般是RDD分区数和表分区的乘积。因此,当任务并 ssc.textFile("/user/ods/coursepay.log") .filter(item => { val obj = ParseJsonData.getJsonData(item) obj.isInstanceOf[JSONObject] 在使用SparkSql进行项目开发的过程,往往会碰到一个比较头疼的问题,由于SparkSql的默认并行度是200,当sql包含有join、group by相关的shuffle操作时,会产生很多小文件;从集群优化的角度来说,太多的小文件对NameNode的压力比较大,而且太多的小文件对后续使用该表进行计算时会启动很多不必要的maptask,任务耗时高。因此,需要对小文件问题进行优化。在Datase... 背景:在数仓任务,经常要解决小文件的问题。有时间为了解决小文件问题,我们把spark.sql.shuffle.partitions这个参数调整的很小,但是随着时间的推移,数据量越来越大,当初设置的参数就不合适了,那有没有一个可以自我伸缩的参数呢?
 
推荐文章