Parquet 文件生成和读取

文章目录

      • 一、什么是 Parquet
      • 二、实现 Java 读写 Parquet 的流程
        • 方式一:
        • 遇到的坑:
          • 坑1:ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge
          • 坑2:No FileSystem for scheme "file"
          • 坑3:与 spark-sql 的引入冲突
        • 方式二:

一、什么是 Parquet

  Parquet 是一种列式存储格式,用于高效地存储和处理大规模数据集。它被广泛应用于大数据处理和分析场景中,例如 Apache Hadoop、Apache Spark 等。

  与传统的行式存储格式(如CSV和JSON)相比,Parquet 能够显著提高读写性能和存储效率。它将数据按列进行存储,而不是按行存储,这样可以更好地利用存储空间,减少 I/O 开销,并提供更高的压缩比。

二、实现 Java 读写 Parquet 的流程

方式一:

  Maven 依赖:

        <dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version></dependency>
[root@local~]# vim schema.avsc
{"type": "record","name": "User","fields": [{"name": "field1","type": "string"}, {"name": "field2","type": "int"}]
}
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;import java.io.File;
import java.io.IOException;public class WriteToParquet {public static void main(String[] args) {try {// 创建Schema对象Schema schema = new Schema.Parser().parse(new File("schema.avsc"));// 方式二:不需要读文件// Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}");// 创建GenericRecord对象GenericRecord record = new GenericData.Record(schema);record.put("field1", "value1");record.put("field2", 123);// 创建ParquetWriter对象ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path("output.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY).build();// 将数据写入Parquet文件writer.write(record);// 关闭ParquetWriterwriter.close();// 创建ParquetReader对象ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("output.parquet")).build();// 读取Parquet文件中的数据// GenericRecord record;while ((record = reader.read()) != null) {// 处理每一条记录System.out.println(record.get("field1"));System.out.println(record.get("field2"));}// 关闭ParquetReaderreader.close();} catch (IOException e) {e.printStackTrace();}}
}
[root@local~ ]# java -cp /huiq/only-maven-1.0-SNAPSHOT-jar-with-dependencies.jar WriteToParquet
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.htrace.core.Tracer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
value1
123[root@local~ ]# ll -a
-rw-r--r-- 1 root root  51783396 Feb 27 17:45 only-maven-1.0-SNAPSHOT-jar-with-dependencies.jar
-rw-r--r-- 1 root root       615 Feb 27 17:45 output.parquet
-rw-r--r-- 1 root root        16 Feb 27 17:45 .output.parquet.crc
-rw-r--r-- 1 root root       147 Feb 26 17:24 schema.avsc

参考:
java写parquet
java parquet AvroParquetWriter

遇到的坑:
坑1:ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge

  一开始引入的依赖:

        <dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.0.0</version></dependency>

  报错:

Exception in thread "main" java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonMergeat com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.<clinit>(JacksonAnnotationIntrospector.java:50)at com.fasterxml.jackson.databind.ObjectMapper.<clinit>(ObjectMapper.java:351)at org.apache.avro.Schema.<clinit>(Schema.java:109)at org.apache.avro.Schema$Parser.parse(Schema.java:1413)at WriteToParquet.main(WriteToParquet.java:21)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMergeat java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 5 more

  解决:

        <dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.0.0</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId></exclusion></exclusions></dependency>

  原因:我看当引入 hadoop-client 3.3.1 版本的时候 maven 依赖库里是 jackson-annotations-2.11.3.jar,但引入 hadoop-client 3.0.0 版本的时候 maven 依赖库里是 jackson-annotations-2.7.8.jar 执行程序会报上面那个错,于是在 3.0.0 版本中去掉 jackson-annotations 依赖后看 maven 依赖库里就是 jackson-annotations-2.11.3.jar 了。后来测试 jackson-annotations-2.6.7.jar 也正常。

坑2:No FileSystem for scheme “file”

  整合到项目中报错:org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
  解决:增加如下代码

            Configuration conf = new Configuration();conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");// 或者
//            conf.set("fs.hdfs.impl",
//                    org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
//            );
//            conf.set("fs.file.impl",
//                    org.apache.hadoop.fs.LocalFileSystem.class.getName()
//            );FileSystem fs = FileSystem.get(conf); // 这行必须有虽然没有被引用

参考:
java.io.IOException: No FileSystem for scheme: file
MapReduce 踩坑 - hadoop No FileSystem for scheme: file/hdfs
FileSystem及其源码分析

坑3:与 spark-sql 的引入冲突
        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version></dependency>

  报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/parquet/schema/LogicalTypeAnnotationat org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:157)at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:190)at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:533)at com.heheda.app.SparkWriteCsvToParquet.main(SparkWriteCsvToParquet.java:46)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.schema.LogicalTypeAnnotationat java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 5 more

  一开始的思路:

        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version><exclusion><groupId>org.apache.parquet</groupId><artifactId>parquet-column</artifactId></exclusion></dependency>

  接着又报错:

Exception in thread "main" java.lang.AbstractMethodError: org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg
/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V	at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)at com.heheda.app.SparkWriteCsvToParquet.main(SparkWriteCsvToParquet.java:52)

注:文章里说不需要 Hadoop 也行,但我没成功,提交到有 Hadoop 环境的服务器上可以运行,但本地 Idea 中报错生成了 parquet 空文件或者没有文件生成:

Exception in thread "main" java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsat org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:736)at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:271)at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:287)at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:433)at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)at org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:327)at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:292)at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:646)at WriteToParquet.main(WriteToParquet.java:33)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsat org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)at org.apache.hadoop.util.Shell.<clinit>(Shell.java:689)at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3741)at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3736)at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3520)at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:288)at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)at org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath(HadoopOutputFile.java:58)at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:643)... 1 more
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468)at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:439)at org.apache.hadoop.util.Shell.<clinit>(Shell.java:516)... 11 more
方式二:

  网上许多写入 parquet 需要在本地安装 haddop 环境,下面介绍一种不需要安装 haddop 即可写入 parquet 文件的方式;

  来自:列式存储格式之parquet读写

  Maven 依赖:

        <dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-core</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>1.8.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro --><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.8.1</version></dependency>
public class User {private String id;private String name;private String password;public User() {}public User(String id, String name, String password) {this.id = id;this.name = name;this.password = password;}public String getId() {return id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}@Overridepublic String toString() {return "User{" +"id='" + id + '\'' +", name='" + name + '\'' +", password='" + password + '\'' +'}';}
}

注:这种方式的 User 实体类和上面方式的 schema.avsc 文件中的 "name": "User" 有冲突,报错:

Exception in thread "main" org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/heheda/output.parquetat org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)at WriteToParquet.main(WriteToParquet.java:55)
Caused by: java.lang.ClassCastException: User cannot be cast to org.apache.avro.generic.IndexedRecordat org.apache.avro.generic.GenericData.setField(GenericData.java:818)at org.apache.parquet.avro.AvroRecordConverter.set(AvroRecordConverter.java:396)at org.apache.parquet.avro.AvroRecordConverter$2.add(AvroRecordConverter.java:132)at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary(AvroConverters.java:64)at org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)... 3 more

  写入:

import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;public class WriteToParquet {public static void main(String[] args) {try {List<User> users = new ArrayList<>();User user1 = new User("1","huangchixin","123123");User user2 = new User("2","huangchixin2","123445");users.add(user1);users.add(user2);Path dataFile = new Path("output.parquet");ParquetWriter<User> writer = AvroParquetWriter.<User>builder(dataFile).withSchema(ReflectData.AllowNull.get().getSchema(User.class)).withDataModel(ReflectData.get()).withConf(new Configuration()).withCompressionCodec(SNAPPY).withWriteMode(OVERWRITE).build();for (User user : users) {writer.write(user);}writer.close();} catch (IOException e) {e.printStackTrace();}}
}

  Idea 本地执行:

在这里插入图片描述
  读取:

import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;import java.io.IOException;public class WriteToParquet {public static void main(String[] args) {try {Path dataFile = new Path("output.parquet");ParquetReader<User> reader = AvroParquetReader.<User>builder(dataFile).withDataModel(new ReflectData(User.class.getClassLoader())).disableCompatibility().withConf(new Configuration()).build();User user;while ((user = reader.read()) != null) {System.out.println(user);}} catch (IOException e) {e.printStackTrace();}}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/496490.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

排序算法--堆排序

堆排序的时间复杂度是O&#xff08;N*logN&#xff09;&#xff0c;优于选择排序O&#xff08;N^2&#xff09; 一、堆 1.堆的概念&#xff1a;堆一般指的是二叉堆&#xff0c;顾名思义&#xff0c;二叉堆是完全二叉树或者近似完全二 2.堆的性质&#xff1a;①完全二叉树 ②每…

java高级——反射

目录 反射概述反射的使用获取class对象的三种方式反射获取类的构造器1. 获取类中所有的构造器2. 获取单个构造器 反射获取构造器的作用反射获取成员变量反射变量赋值、取值获取类的成员方法反射对象类方法执行 反射简易框架案例案例需求实现步骤代码如下 反射概述 什么是反射 反…

虚拟机数据恢复-虚拟机误还原快照后如何恢复还原前的数据?

虚拟机数据恢复环境&故障&#xff1a; 由一台物理服务器迁移到ESXI上的虚拟机&#xff0c;虚拟机迁移完成后做了一个快照&#xff0c;该ESXI上面一共运行了数十台虚拟机。某天工作人员不小心将快照进行了还原&#xff0c;虚拟机内的数据还原到了数年前刚迁移过来时的状态&a…

精品springboot相亲网站预约

《[含文档PPT源码等]精品基于springboot相亲网站[包运行成功]》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功&#xff01; 软件开发环境及开发工具&#xff1a; Java——涉及技术&#xff1a; 前端使用技术&#xff1a;HTML5,CSS3、Ja…

页面大小自适应最优解决插件

简介 为了解决在不同设备上面能够看到一样的效果,我们可以使用autofit.js插件来实现 autofit.js是一个可以让你的PC项目自适应屏幕的工具&#xff0c;其原理非常简单&#xff0c;即在scale等比缩放的基础上&#xff0c;向右或向下增加了宽度或高度&#xff0c;以达到充满全屏的…

MySQL 篇-深入了解 DML、DQL 语言(二)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 DML、DQL 语言说明 2.0 使用 DML 实现对数据管理和操作 2.1 DML - 增添数据 insert 2.2 DML - 修改数据 update 2.3 DML - 删除数据 delete 3.0 使用 DQL 实现对…

TP6上传图片到OSS(记录贴)

1&#xff0c;先安装&#xff0c;我使用composer安装 在项目的根目录运行composer require aliyuncs/oss-sdk-php 2,安装成功以后vendor目录下可以看到如图&#xff1a; 3&#xff0c;上传图片代码如下&#xff1a; <?php namespace app\controller;use app\BaseControll…

77. 组合(力扣LeetCode)

文章目录 77. 组合题目描述回溯算法组合问题的剪枝操作 77. 组合 题目描述 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 4, k 2 输出&#xff1a; [ [2,4], [3,4],…

农业四情监测设备为什么符合高标准农田建设

TH-Q3随着科技的不断进步&#xff0c;智慧农业正逐渐成为现代农业发展的重要方向。其中&#xff0c;农业四情监测系统以其独特的功能和优势&#xff0c;在高标准农田建设中发挥着越来越重要的作用。 一、农业四情监测系统的概念及功能 农业四情监测系统&#xff0c;顾名思义&am…

【Linux】进程信号 --- 信号的产生 保存 捕捉递达

文章目录 信号的感知信号的结构描述 一、信号的产生1.通过键盘发送信号2.通过系统调用发送信号 二、信号的保存&#xff08;PCB内部的两张位图和一个函数指针数组&#xff09;理解三张数据结构表block pending haldler 三、通过代码编写 理解 信号的保存和递达1.信号集操作的库…

基于java SSM springboot+redis网上水果超市商城设计和实现以及文档

基于java SSM springbootredis网上水果超市商城设计和实现以及文档 博主介绍&#xff1a;多年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 …

分布式一致性算法-Raft

分布式一致性算法Raft 分布式一致性问题Raft算法细节节点状态节点状态演变选举leader过程日志复制过程 选举leader初始的选举领导者故障后的选举拆分投票 日志复制网络分区 再看分布式一致性问题写在最后 分布式一致性问题 假设有一个单节点的系统&#xff0c;这个系统是一个数…