Hadoop/Hive/Spark小文件处理

什么是小文件?

小文件指的是文件size比HDFS的block size小很多的文件。Hadoop适合处理少量的大文件,而不是大量的小文件。

hadoop小文件常规的处理方式

1、小文件导致的问题

首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间。这样namenode内存容量严重制约了集群的扩展。
其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能。
最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

2、Hadoop自带的解决方案

对于小文件问题,Hadoop本身也提供了几个解决方案,分别为: Hadoop Archive , Sequence
file 和 CombineFileInputFormat 。

  • Hadoop Archive
    Hadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多
    个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行
    透明的访问。
  • Sequence file
    sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可
    以将大批小文件合并成一个大文件。
  • CombineFileInputFormat
    它是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的
    存储位置。

Hadoop Archive

Hadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。对某个目录/foo/bar下的所有小文件存档成/outputdir/ zoo.har:

hadoop archive -archiveName zoo.har -p /foo/bar /outputdir

当然,也可以指定HAR的大小(使用-Dhar.block.size)。
HAR是在Hadoop file system之上的一个文件系统,因此所有fs shell命令对HAR文件均可用,只不过是文件路径格式不一样,HAR的访问路径可以是以下两种格式:

har://scheme-hostname:port/archivepath/fileinarchive
har:///archivepath/fileinarchive(本节点)

可以这样查看HAR文件存档中的文件:

hadoop dfs -ls har:///user/zoo/foo.har

输出:

har:///user/zoo/foo.har/hadoop/dir1
har:///user/zoo/foo.har/hadoop/dir2

使用HAR时需要两点
第一,对小文件进行存档后,原文件并不会自动被删除,需要用户自己删除;
第二,创建HAR文件的过程实际上是在运行一个mapreduce作业,因而需要有一个hadoop集群运行此命令。

此外,HAR还有一些缺陷:
第一,一旦创建,Archives便不可改变。要增加或移除里面的文件,必须重新创建归档文件。
第二,要归档的文件名中不能有空格,否则会抛出异常,可以将空格用其他符号替换
(使用-Dhar.space.replacement.enable=true 和-Dhar.space.replacement参数)。
第三,存档文件不支持压缩。

一个归档后的文件,其存储结构如下图:
在这里插入图片描述

Sequence file

Sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。
Hadoop-0.21.0中提供了SequenceFile,包括Writer,Reader和SequenceFileSorter类进行写,读和排序操作。
在这里插入图片描述
创建sequence file的过程可以使用mapreduce工作方式完成,对于index,需要改进查找算法
优缺点:对小文件的存取都比较自由,也不限制用户和文件的多少,但是该方法不能使用append方
法,所以适合一次性写入大量小文件的场景

CombineFileInputFormat

CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。

Hive小文件问题怎么解决?

首先,我们要弄明白两个问题:
1)哪里会产生小文件
源数据本身有很多小文件
动态分区会产生大量小文件
reduce个数越多, 小文件越多
按分区插入数据的时候会产生大量的小文件, 文件个数 = maptask个数 * 分区数

2)小文件太多造成的影响
从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启
动,执行会浪费大量的资源,严重影响性能。
HDFS存储太多小文件, 会导致namenode元数据特别大, 占用太多内存, 制约了集群的扩展。

小文件解决方案:

方法一:通过调整参数进行合并

1)在Map输入的时候, 把小文件合并

-- 每个Map最大输入大小,决定合并后的文件数
set mapred.max.split.size=256000000;
-- 一个节点上split的至少的大小,决定了多个datanode上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;
-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;
-- 执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

2)在Reduce输出的时候, 把小文件合并

-- 在map-onlyjob后合并文件,默认true
set hive.merge.mapfiles=true;
-- 在map-reducejob后合并文件,默认false
set hive.merge.mapredfiles=true;
-- 合并后每个文件的大小,默认256000000
set hive.merge.size.per.task=256000000;
-- 平均文件大小,是决定是否执行合并操作的阈值,默认16000000 sethive.merge.smallfiles.avgsize=100000000;

方法二:针对按分区插入数据的时候产生大量的小文件的问题,可以使用DISTRIBUTE BY rand() 将数据

随机分配给Reduce,这样可以使得每个Reduce处理的数据大体一致。

-- 设置每个reducer处理的大小为5个G
set hive.exec.reducers.bytes.per.reducer=5120000000;
-- 使用distributebyrand()将数据随机分配给reduce,避免出现有的文件特别大,有的文件特别小
insert overwrite table test partition(dt)
select * from iteblog_tmp
DISTRIBUTEBYrand();

方法三:使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件

方法四:使用hadoop的archive归档

-- 用来控制归档是否可用
set hive.archive.enabled=true;
-- 通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;-- 控制需要归档文件的大小
set har.partfile.size=1099511627776;
-- 使用以下命令进行归档
ALTER TABLE srcpart ARCHIVE PARTITION(ds='2008-04-08',hr='12');
- 对已归档的分区恢复为原文件
ALTER TABLE srcpart UNARCHIVE PARTITION(ds='2008-04-08',hr='12');
-- 注意,归档的分区不能够INSERTOVERWRITE,必须先unarchive

Spark输出文件的个数,如何合并小文件?

当使用spark sql执行etl时候出现了,最终结果大小只有几百k,但是小文件一个分区可能就有上千的情况。
小文件过多的一些危害如下:

  • hdfs有最大文件数限制
  • 浪费磁盘资源(可能存在空文件)
  • hive中进行统计,计算的时候,会产生很多个map,影响计算的速度。

解决方案如下:

方法一:通过spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) //(true表示是否shuffle)
val rdd3 = rdd1.repartition(8)

coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,如果是生成一个窄依赖的结果,那么可以不发生shuffle,分区的数量发生激烈的变化,计算节点不足,不设置true可能会出错。
repartition:coalesce()方法shuffle为true的情况。

方法二:降低spark并行度,即调节spark.sql.shuffle.partitions

比如之前设置的为100,按理说应该生成的文件数为100;
但是由于业务比较特殊,采用的大量的union all,且union all在spark中属于窄依赖,不会进行shuffle,所以导致最终会生成(union all数量+1)*100的文件数。
如有10个union all,会生成1100个小文件。这样导致降低并行度为10之后,执行时长大大增加,且文件数依旧有110个,效果不理想。

方法三:新增一个并行度=1任务,专门合并小文件

先将原来的任务数据写到一个临时分区(如tmp);再起一个并行度为1的任务,类似:

insert overwrite 目标表 select * from 临时分区

结果小文件数还是没有减少,经过多次测后发现原因:‘select * from 临时分区’ 这个任务在spark中属于窄依赖;
并且spark DAG中分为宽依赖和窄依赖,只有宽依赖会进行shuffle;
故并行度shuffle,spark.sql.shuffle.partitions=1也就没有起到作用;

由于数据量本身不是特别大,所以直接采用了group by(在spark中属于宽依赖)的方式,类似:

insert overwrite 目标表 select * from 临时分区 group by * 

先运行原任务,写到tmp分区,‘dfs -count’查看文件数,1100个,运行加上group by的临时任务(spark.sql.shuflle.partitions=1),查看结果目录,文件数=1,成功。

总结:

1)方便的话,可以采用coalesce()方法和repartition()方法
2)如果任务逻辑简单,数据量少,可以直接降低并行度
3)任务逻辑复杂,数据量很大,原任务大并行度计算写到临时分区,再加两个任务:
一个用来将临时分区的文件用小并行度(加宽依赖)合并成少量文件到实际分区
另一个删除临时分区
4)hive任务减少小文件相对比较简单,可以直接设置参数,如:
Map-only的任务结束时合并小文件:

set hive.merge.mapfiles = true

在Map-Reduce的任务结束时合并小文件:

set hive.merge.mapredfiles= true

当输出文件的平均大小小于1GB时,启动一个独立的map-reduce任务进行文件merge:

set hive.merge.smallfiles.avgsize=1024000000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/13555.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

E2. Rudolf and Snowflakes (hard version) codeforces1846E2

Problem - E2 - Codeforces 题目大意:在无向图中,初始有一个点,然后将k个点连接到1号点上,之后每次操作分别将k歌点连接到之前新加的点上,这样的操作至少有1次,t次询问,每次询问给出一个数n&am…

Mockplus Cloud - June 2023crack

Mockplus Cloud - June 2023crack 添加便签以澄清情节提要上的任何设计概念。 新的流程图工具直接在情节提要上可视化任何设计流程和过程。 添加了在发布到Mockplus Cloud时删除RP页面的功能。 添加设计注释时包括图像和链接。 添加了一个新的提示,用于在断开互联网…

vue使用富文本编辑器 Wangeditor 可显示编辑新增回显禁用

1.效果图 2.安装依赖 npm install wangeditor 3.在main.js 全局引入 富文本组件 import editorBar from "/components/editor/editor.vue";Vue.component(editorBar, editorBar) 全局引入页面使用 <editor-bar v-model"form.nr" :flag"false&quo…

JVM源码剖析之Java对象创建过程

关于 "Java的对象创建" 这个话题分布在各种论坛、各种帖子&#xff0c;文章的水平参差不齐。并且大部分仅仅是总结 "面试宝典" 的流程&#xff0c;小部分就是copy其他帖子&#xff0c;极少能看到拿源码作为论证。所以特意写下这篇文章。 版本信息如下&…

Flutter开发微信小程序实战:构建一个简单的天气预报小程序

微信小程序是一种快速、高效的开发方式&#xff0c;Flutter则是一款强大的跨平台开发框架。结合二者&#xff0c;可以轻松地开发出功能丰富、用户体验良好的微信小程序。 这里将介绍如何使用Flutter开发一个简单的天气预报小程序&#xff0c;并提供相应的代码示例。 1. 准备工…

使用Docker安装RabbitMQ并实现入门案例“Hello World”

RabbitMQ官方文档&#xff1a;RabbitMQ Tutorials — RabbitMQ 一、RabbitMQ安装&#xff08;Linux下&#xff09; 你可以选择原始的方式安装配置&#xff0c;也可以使用docker进行安装&#xff0c;方便快捷&#xff01; 1. 安装docker 没有docker的先安装一下docker&#x…

OpenCV图像的仿射变换、旋转和缩放

以下是对代码的逐行解释: // 包含必要的OpenCV头文件和C++库文件 #include "opencv2/highgui/highgui.hpp" #include "opencv2/imgproc/imgproc.hpp" #include <iostream> using namespace cv;

密码学学习笔记(六):Hash Functions - 哈希函数2

哈希函数是怎么构成的&#xff1f; Merkle–Damgrd结构 哈希函数需要能够处理任意长度的输入。许多散列函数&#xff0c;例如MD5、SHA-1、SHA-2&#xff0c;都是由构建块&#xff08;称为压缩函数&#xff09;组成的&#xff0c;这些构建块可以处理特定的块大小&#xff0c;并…

Centos环境Access denied for user ‘root‘@‘%to database ‘xxx‘

Centos7解决数据库出现Access denied for user ‘root‘‘%to database ‘xxx‘ 问题 原因: root%表示 root用户 通过任意其他端访问操作 被拒绝! 授权即可: 1&#xff1a;进入数据库 mysql -u root -p 2.输入 mysql>grant all privileges on *.* to root% with grant o…

pdf转为ppt的超简单方法,就用这几个!

在我们的工作和生活中&#xff0c;PDF文件是不可或缺的文件格式之一。它以高准确性、整齐的页面排版和流畅的翻页而闻名&#xff0c;为我们处理文档提供了很大的帮助。然而&#xff0c;PDF文件的一个缺点是无法进行修改。当我们不小心输入错误数据或需要进行编辑时&#xff0c;…

SQL Server中如何将累积数值拆解

概要 本文通过一个计算汽车每日里程数的例子&#xff0c;展现如何通过汽车每日的总里程数&#xff0c;来计算汽车每日的里程数。 代码及实现 每辆汽车中都有一个里程数表&#xff0c;记录汽车从出场到当前行驶的里程数&#xff0c;下表是一样汽车的里程数表&#xff0c;该表…

Jetpack compose——深入了解Diffing

Diffing是什么 "Diffing" 是 Jetpack Compose 中用于优化性能的一种技术。它的工作原理是比较新旧 UI 树&#xff0c;并只更新实际发生变化的部分。这意味着即使你的应用有大量的 UI&#xff0c;Compose 也能保持高效的性能。 当 Composable 函数被重新调用&#x…