序:契机
- 生产环境有设备出重大事故,又因一关键功能无法使用,亟需将生产环境的原始MQTT报文(以 parquet 文件格式 + zstd 压缩格式 落盘)DOWN到本地,读取并解析。
本文聚焦在 本地电脑,用 java 读取 parquet 文件
- 相当多网络文档的读取代码无法正常运行,有必要记录一二,后续还需进一步研究 parquet 和 zstd 算法。
概述:parquet//TODO
案例示范
引入依赖
<properties><!-- 1.13.1 / 1.12.0 --><parquet.version>1.13.1</parquet.version><avro.version>1.10.2</avro.version><!-- 3.2.1 / 2.7.3 --><hadoop.version>3.2.1</hadoop.version><!-- 1.5.0-1 / 1.5.5-5 / 与 kafka-clients:1.4-xxx 版本冲突 --><zstd.version>1.5.0-1</zstd.version>
</properties><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>${parquet.version}</version>
</dependency>
<dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>${parquet.version}</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version><exclusions><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion></exclusions>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version>
</dependency><!-- ztsdZstd 的 RecyclingBufferPool 是一种内存管理机制,用于高效地重用缓冲区,减少内存分配和释放的开销。Zstd库中的BufferPool接口提供了多种实现,其中RecyclingBufferPool是一种常见的实现方式。 -->
<dependency><groupId>com.github.luben</groupId><artifactId>zstd-jni</artifactId><!-- 1.5.5-5 / kafka-clients 包冲突 --><version>${zstd.version}</version>
</dependency>
ReadParquetFormatMQTTMessageRawDataDemo
import com.xx.yy.common.utils.DatetimeUtil;
import com.xx.yy.common.utils.FileUtil;
import com.alibaba.fastjson2.JSON;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 本地读取 Parquet 文件* @reference-doc* [1] java读取parquet文件 - https://blog.csdn.net/letterss/article/details/131417952* [2] datax读取Parquet格式文件总列数 - https://blog.csdn.net/letterss/article/details/131189471*/
@Slf4j
public class ReadParquetFormatMQTTMessageRawDataDemo {@SneakyThrowspublic static void main(String[] args) {String baseDir = "file:///E:/tmp_data/parquet_mqtt_messages/ods_raw_data_history/";String parquetFilePathStr = baseDir + "20081-XXXX-XXXXXXXXXX/e7db5c81e70131d55d4fb4a1752b90f2-1.parquet"; //"output.parquet"try {// 指定 Parquet 文件路径//Path parquetFilePath = new Path("C:\\\\Users\\\\Administrator\\\\Desktop\\\\07fe7433-99c2-41d8-a91d-27a77d99f690-0_4-8-0_20230510103931772.parquet");Path parquetFilePath = new Path(parquetFilePathStr);//查询总列数,参考博客: https://blog.csdn.net/letterss/article/details/131189471int allColumnsCount = getParquetAllColumnsCount(parquetFilePath);int columnIndexMax = -1;columnIndexMax = allColumnsCount - 1;// 创建 ParquetReader.Builder 实例ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), parquetFilePath);// 创建 ParquetReader 实例ParquetReader<Group> reader = builder.build();// 循环读取 Parquet 文件中的记录Group record;List<Map<String, Object>> records = new ArrayList<>();while ((record = reader.read()) != null) {Map<String, Object> recordMap = new HashMap<>();// 处理每个记录的逻辑for (int i = 0; i <= columnIndexMax; i++) {String fieldKey = record.getType().getType(i).getName(); //record.getType().getFieldName(i);Object fieldValue = record.getValueToString(i, 0);recordMap.put( fieldKey , fieldValue);System.out.println(fieldValue);}records.add( recordMap );//writeMqttMessageRawDataToLocal(recordMap);}System.out.println(JSON.toJSONString( records ));// 关闭读取器reader.close();} catch (IOException e) {e.printStackTrace();}}@SneakyThrowspublic static void writeMqttMessageRawDataToLocal(Map<String, Object> recordMap){String targetBaseDir = "E:\\tmp_data\\batch_parse_raw_mqtt_messages\\";String hexMqttMessageRawData = (String) recordMap.get("raw_data");String deviceId = (String) recordMap.get("device_id");Long collectTime = Long.valueOf( (String) recordMap.get("collect_time") );String mqttMessageRawDataFile = String.format("%s.%d(%s).hex-bin", deviceId, collectTime, DatetimeUtil.longToString(collectTime, DatetimeUtil.MILLISECOND_TIME_WITH_NUMBER_FORMAT) );FileUtil.writeToFile(hexMqttMessageRawData, targetBaseDir + mqttMessageRawDataFile);//FileUtils.write();}/*** 获取 parquet 的 总列数* @reference-doc* [1] datax读取Parquet格式文件总列数 - https://blog.csdn.net/letterss/article/details/131189471*/public static int getParquetAllColumnsCount(Path path){int columnCount = -1;Configuration configuration = new Configuration();//Path path = new Path(parquetFilePath);try {ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, path);List<ColumnChunkMetaData> columns = metadata.getBlocks().get(0).getColumns();columnCount = columns.size();System.out.println("Total column count: " + columnCount);} catch (Exception e) {e.printStackTrace();}return columnCount;}
}
X 参考文献
- java读取parquet文件 - CSDN 【参考/推荐】
- datax读取Parquet格式文件总列数 - CSDN 【参考/推荐】
- Java读取本地Parquet文件 - CSDN
- Parquet 文件生成和读取 - CSDN
- org.apache.parquet.io.InputFile的S3实现? - 腾讯云
Configuration conf = new Configuration();
conf.set(Constants.ENDPOINT, "https://s3.eu-central-1.amazonaws.com/");
conf.set(Constants.AWS_CREDENTIALS_PROVIDER,DefaultAWSCredentialsProviderChain.class.getName());
// maybe additional configuration properties depending on the credential providerURI uri = URI.create("s3a://bucketname/path");
org.apache.hadoop.fs.Path path = new Path(uri);ParquetFileReader pfr = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))
- Parquet文件读写与合并小Parquet文件 - CSDN
- java 解析 parquet文件怎么打开 - 51CTO