[Java/压缩] Java读取Parquet文件

news/2025/3/12 14:25:28/文章来源:https://www.cnblogs.com/johnnyzen/p/18631417

序:契机

  • 生产环境有设备出重大事故,又因一关键功能无法使用,亟需将生产环境的原始MQTT报文(以 parquet 文件格式 + zstd 压缩格式 落盘)DOWN到本地,读取并解析。

本文聚焦在 本地电脑,用 java 读取 parquet 文件

  • 相当多网络文档的读取代码无法正常运行,有必要记录一二,后续还需进一步研究 parquet 和 zstd 算法。

概述:parquet//TODO

案例示范

引入依赖

<properties><!-- 1.13.1 / 1.12.0 --><parquet.version>1.13.1</parquet.version><avro.version>1.10.2</avro.version><!-- 3.2.1 / 2.7.3 --><hadoop.version>3.2.1</hadoop.version><!-- 1.5.0-1 / 1.5.5-5 / 与 kafka-clients:1.4-xxx 版本冲突 --><zstd.version>1.5.0-1</zstd.version>
</properties><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>${parquet.version}</version>
</dependency>
<dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>${parquet.version}</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version><exclusions><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion></exclusions>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version>
</dependency><!-- ztsdZstd 的 RecyclingBufferPool‌ 是一种内存管理机制,用于高效地重用缓冲区,减少内存分配和释放的开销。Zstd库中的BufferPool接口提供了多种实现,其中RecyclingBufferPool是一种常见的实现方式。 -->
<dependency><groupId>com.github.luben</groupId><artifactId>zstd-jni</artifactId><!-- 1.5.5-5 / kafka-clients 包冲突 --><version>${zstd.version}</version>
</dependency>

ReadParquetFormatMQTTMessageRawDataDemo

import com.xx.yy.common.utils.DatetimeUtil;
import com.xx.yy.common.utils.FileUtil;
import com.alibaba.fastjson2.JSON;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 本地读取 Parquet 文件* @reference-doc*  [1] java读取parquet文件 - https://blog.csdn.net/letterss/article/details/131417952*  [2] datax读取Parquet格式文件总列数 - https://blog.csdn.net/letterss/article/details/131189471*/
@Slf4j
public class ReadParquetFormatMQTTMessageRawDataDemo {@SneakyThrowspublic static void main(String[] args) {String baseDir = "file:///E:/tmp_data/parquet_mqtt_messages/ods_raw_data_history/";String parquetFilePathStr = baseDir + "20081-XXXX-XXXXXXXXXX/e7db5c81e70131d55d4fb4a1752b90f2-1.parquet"; //"output.parquet"try {// 指定 Parquet 文件路径//Path parquetFilePath = new Path("C:\\\\Users\\\\Administrator\\\\Desktop\\\\07fe7433-99c2-41d8-a91d-27a77d99f690-0_4-8-0_20230510103931772.parquet");Path parquetFilePath = new Path(parquetFilePathStr);//查询总列数,参考博客: https://blog.csdn.net/letterss/article/details/131189471int allColumnsCount = getParquetAllColumnsCount(parquetFilePath);int columnIndexMax = -1;columnIndexMax = allColumnsCount - 1;// 创建 ParquetReader.Builder 实例ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), parquetFilePath);// 创建 ParquetReader 实例ParquetReader<Group> reader = builder.build();// 循环读取 Parquet 文件中的记录Group record;List<Map<String, Object>> records = new ArrayList<>();while ((record = reader.read()) != null) {Map<String, Object> recordMap = new HashMap<>();// 处理每个记录的逻辑for (int i = 0; i <= columnIndexMax; i++) {String fieldKey = record.getType().getType(i).getName(); //record.getType().getFieldName(i);Object fieldValue = record.getValueToString(i, 0);recordMap.put( fieldKey , fieldValue);System.out.println(fieldValue);}records.add( recordMap );//writeMqttMessageRawDataToLocal(recordMap);}System.out.println(JSON.toJSONString( records ));// 关闭读取器reader.close();} catch (IOException e) {e.printStackTrace();}}@SneakyThrowspublic static void writeMqttMessageRawDataToLocal(Map<String, Object> recordMap){String targetBaseDir = "E:\\tmp_data\\batch_parse_raw_mqtt_messages\\";String hexMqttMessageRawData = (String) recordMap.get("raw_data");String deviceId = (String) recordMap.get("device_id");Long collectTime = Long.valueOf( (String) recordMap.get("collect_time") );String mqttMessageRawDataFile = String.format("%s.%d(%s).hex-bin", deviceId, collectTime, DatetimeUtil.longToString(collectTime, DatetimeUtil.MILLISECOND_TIME_WITH_NUMBER_FORMAT) );FileUtil.writeToFile(hexMqttMessageRawData, targetBaseDir + mqttMessageRawDataFile);//FileUtils.write();}/*** 获取 parquet 的 总列数* @reference-doc*   [1] datax读取Parquet格式文件总列数 - https://blog.csdn.net/letterss/article/details/131189471*/public static int getParquetAllColumnsCount(Path path){int columnCount = -1;Configuration configuration = new Configuration();//Path path = new Path(parquetFilePath);try {ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, path);List<ColumnChunkMetaData> columns = metadata.getBlocks().get(0).getColumns();columnCount = columns.size();System.out.println("Total column count: " + columnCount);} catch (Exception e) {e.printStackTrace();}return columnCount;}
}

X 参考文献

  • java读取parquet文件 - CSDN 【参考/推荐】
  • datax读取Parquet格式文件总列数 - CSDN 【参考/推荐】
  • Java读取本地Parquet文件 - CSDN
  • Parquet 文件生成和读取 - CSDN
  • org.apache.parquet.io.InputFile的S3实现? - 腾讯云
Configuration conf = new Configuration();
conf.set(Constants.ENDPOINT, "https://s3.eu-central-1.amazonaws.com/");
conf.set(Constants.AWS_CREDENTIALS_PROVIDER,DefaultAWSCredentialsProviderChain.class.getName());
// maybe additional configuration properties depending on the credential providerURI uri = URI.create("s3a://bucketname/path");
org.apache.hadoop.fs.Path path = new Path(uri);ParquetFileReader pfr = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))
  • Parquet文件读写与合并小Parquet文件 - CSDN
  • java 解析 parquet文件怎么打开 - 51CTO

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/858842.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

莫队从入门到人门

普通莫队 详介(P2709 小B的询问) 普通莫队处理问题的前提是问题可以离线,多次区间查询,\(O(n\sqrt m)\) 能过。 我们以 P2709 小B的询问 为例,假设当前区间为 \([l,r]\),答案为 \(ans\),那么 \(r\) 右移一位时,新加入一个数 \(x\),我们只要把 \(ans\) 加上 \(x\) 的贡…

nacos安装注意事项

一年多没玩了,都快忘了,最新版本已经2.3.x了 3.0也快问世了 Linux/Unix/Mac 单机启动命令: sh startup.sh -m standalone Windows startup.cmd -m standalone如果直接未启动就是集群模式,但是要注意nacos.properties里面配置集群信息本文来自博客园,作者:余生请多指教ANT…

PWN系列-2.27版本利用setcontext实现orw

PWN系列-2.27版本利用setcontext实现orw 知识 开启沙箱之后,我们就只能用orw的方式来得到flag。 这篇博客主要讲通过劫持__free_hook或者__malloc_hook利用setcontext在libc或者heap上执行rop或者shellcode。 在free堆块的时候,rdi会指向堆块,在检测到__free_hook有值的情况…

shell语法保姆级教程

Shell脚本 建立一个sh脚本 touch 1.sh (新建脚本文件)vi 1.sh(编写文件内容)按 i 可以写入内容,按esc :wq退出并保存解释 1、创建脚本文件 2、脚本文件中第一行为指定脚本编译器:# !/bin/bash 最终调用的都是dash执行shell脚本命令: 1、./1.sh难道我们必须要修改权限才能执…

从0开始学uniapp——认识HBuilderX

为什么使用uniapp:可以多端运行,写好了这一套可以用在h5,安卓程序,小程序多端,很方便。1.百度搜HBuilderX,使用该编译器学习uniapp 2.新建一个默认项目 pages——用于存放页面,这里都是.vue后缀的页面, pages.json——用于存放路由pages数组里按例子添加即可,HBuilder…

Java中SPI机制原理解析

本文介绍了Java中SPI机制实现的大概原理以及SPI机制在常见的框架如JDBC的Driver加载,SLF4J日志门面实现中的使用。使用SPI机制前后的代码变化加载MySQL对JDBC的Driver接口实现 在未使用SPI机制之前,使用JDBC操作数据库的时候,一般会写如下的代码:// 通过这行代码手动加载My…

Transformers 框架 Pipeline 任务详解(六):填充蒙版(fill-mask)

本文介绍了Hugging Face Transformers框架中的fill-mask任务,涵盖其作用、应用场景如机器翻译和文本补全,以及配置方法。通过Python代码示例展示了如何使用预训练模型自动下载或本地加载来创建Pipeline并执行填空任务。此外,还提供了利用Gradio构建WebUI界面的指南,使用户能…

阿里发布多模态推理模型 QVQ-72B,视觉、语言能力双提升;OpenAI 正在研发人形机器人丨 RTE 开发者日报

开发者朋友们大家好:这里是 「RTE 开发者日报」 ,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE(Real-Time Engagement) 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文章 」、「有看点的 会议 」,但内容仅代表编辑…

python多进程,通过内存共享来通信,使用进程锁来防止数据问题

代码:import multiprocessing import time 使用锁和multiprocessing.Value,multiprocessing.Array,multiprocessing.Manager().listdef worker1(shared_number1, lock):for _ in range(10):with lock:shared_number1.value += 1def worker2(shared_array1, lock):for i in…

Jenkins入门使用

Jenkins入门使用 1先安装jdk才能运行jenkins yum install -y java-1.8.0-openjdk.x86_642 安装jenkins,运行,进行端口绑定,启动jenkins docker search jenkins docker pull jenkins/jenkins docker run -d -u root -p 8080:8080 -p 50000:50000 -v /var/jenkins_home:/var/j…

Java 泛型详细解析

本文将带你详细解析 Java 泛型,了解泛型的原理、常见的使用方法以及泛型的局限性,让你对泛型有更深入的了解。泛型的定义 泛型类的定义 下面定义了一个泛型类 Pair,它有一个泛型参数 T。 public class Pair<T> {private T start;private T end; }实际使用的时候就可以…

javafx-请求篇

OkHttpClient 基本使用步骤如下构建客户端对象OkHttpClient 构建请求Request 生成Call对象 Call发起请求(同步/异步)import java.io.IOException; import okhttp3.Call; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Req…