Spark中读parquet文件是怎么实现的

背景

最近在整理了一下 spark对Parquet的写文件的过程,也是为了更好的理解和调优Spark相关的任务,
因为对于Spark来说,任何一个事情都不是独立的存在的,比如说parquet文件的rowgroup设置的大小对读写的影响,以及parquet写之前排序对读parquet的影响,以及向量化读取等等
本文基于Spark 3.5

分析

我们以FileSourceScanExec的doExecute方法 为切口进行分析:

  protected override def doExecute(): RDD[InternalRow] = {val numOutputRows = longMetric("numOutputRows")if (needsUnsafeRowConversion) {inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>val toUnsafe = UnsafeProjection.create(schema)toUnsafe.initialize(index)iter.map { row =>numOutputRows += 1toUnsafe(row)}}} else {inputRDD.mapPartitionsInternal { iter =>iter.map { row =>numOutputRows += 1row}}}}

这里的needsUnsafeRowConversion判断如果是ParquetSource,且配置了spark.sql.parquet.enableVectorizedReader为‘true’(默认就是true),则会进行unsafeRow的转换,当然这里的好处就是节约内存以及能够减少GC
对于inputRDD来说,就是创建了读取parquet的RDD:
具体的见:ParquetFileFormat.buildReaderWithPartitionValues方法,涉及到的代码多,所以只解读关键的几个部分:

  • fileRooter的读取
        val fileFooter = if (enableVectorizedReader) {// When there are vectorized reads, we can avoid reading the footer twice by reading// all row groups in advance and filter row groups according to filters that require// push down (no need to read the footer metadata again).ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS)} else {ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS)}
    

这里enableVectorizedReader如果是true的话, fileFooter 只会得到所属Task的FileMetaData信息,其中只包括了所属Task的需要读取的parquet RowGroups,具体的数据流如下:

ParquetFooterReader.readFooter||\/
readFooter||\/
fileReader.getFooter||\/
readFooter(file, options, f, converter)||\/
converter.readParquetMetadata||\/
filter.accept(new MetadataFilterVisitor)

filter.accept(new MetadataFilterVisitor就会根据对应的filter类型进行不同的操作:

FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));}@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));}@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);// We must generate the map *before* filtering because it modifies `fileMetadata`.Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);}@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);// We must generate the map *before* filtering because it modifies `fileMetadata`.Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);}});
  • 如果是 ParquetFooterReader.SKIP_ROW_GROUPS ,则是走的SkipMetadataFilter这条filter,则只会拿出rowgroup的信息和rowgrups的的行数

  • 如果是 enableVectorizedReader,也就是会走 RangeMetadataFilter这个Filter,则会调用filterFileMetaDataByMidpoint,该方法会根据Task分配的数据是否覆盖了Rowgroups的中点来纳入到该task的读取的数据中来,具体的可以见:Spark-读取Parquet-为什么task数量会多于Row Group的数量

  • vectorizedReader的创建

            vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter))logDebug(s"Appending $partitionSchema ${file.partitionValues}")vectorizedReader.initBatch(partitionSchema, file.partitionValues)if (returningBatch) {vectorizedReader.enableReturningBatches()}// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.iter.asInstanceOf[Iterator[InternalRow]]
    
    • vectorizedReader.initialize
      重要的点是这个主要是涉及到 parquet messageType到 ParquetColumn的转换,主要是ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration)这个的配置

    • vectorizedReader.initBatch
      这里面主要涉及到了根据memModeOFF_HEAP还是ON_HEAP模式来构造不同的ColumnVector,其中
      如果是ON_HEAP,则会创建OnHeapColumnVector,用jvm数据的形式存储
      如果是OFF_HEAP,则会创建OffHeapColumnVector,这里涉及到的对象都是都是用unsafe api来操作,这里涉及到一个有意思的点:

       Platform.putByte(null, data + rowId, value);Platform.putInt(null, data + 4L * rowId, value)
      

      也就是说 无论是put什么 里面的第一个参数是为null,这个其实在Unsafed方法 putInt(Object o, long offset, int x)类中有提到

       Fetches a value from a given Java variable. More specifically, fetches a field or array element within the given object o at the given offset, or (if o is null) from the memory address whose numerical value is the given offset.
      

      也就是说如果传入的第一个参数为null,则会以offset作为地址,而在OffHeapColumnVector中对应的put当法中涉及到的offset就是data这个变量会在
      OffHeapColumnVector构造函数中的reserveInternal方法中赋值,这其中涉及到unsafe.allocateMemory方法会返回分配的内存地址

    • 具体迭代获取InternalRow
      这里的迭代获取主要是通过 vectorizedReader.getCurrentValue方法实现的,也就是会返回columnarBatch,但是这里的columnarBatch赋值是通过
      vectorizedReader.nextKeyValue方法实现的,该方法会被RecordReaderIterator.hasNext调用,vectorizedReader.nextKeyValue的数据流如下:

       VectorizedParquetRecordReader.nextBatch||\/checkEndOfRowGroup    =>                               初始化  PageReadStore pages = reader.readNextRowGroup(); ||                                                                         ||\/                                                                         \/columnReader.readBatch(num, leafCv.getValueVector()   initColumnReader(pages, cv); // columnVectors 设置ParquetColumnVector 里面包括了rowgroup里的所有page||\/readPage||\/pageReader.readPage()||\/decompressor.decompress //  之类会进行解压

      decompressor.decompressdecompressorChunk.readAllPagesdescriptor.metadata.getCodec()传进来的,也就是从元数据里面读取的
      具体的向量化的读取,细节比较多,包括批量读取definition levelsrepetition levels等,这些读者自行分析

      注意:为什么 FileSourceScanExec中inputRDDs返回的类型是RDD[InternalRow] ,而vectorizedReader.getCurrentValue返回的类型是ColumnarBatch 也能运行,那是因为 我们在运行的时候,会有ColumnarToRow,他最终调用的是FileSourceScanExec.doExecuteColumnar,如下图:
      在这里插入图片描述

jvm会对Iterator[InternalRow]进行类型擦除,也就是说所有Iterator[InternalRow]在编译的时候会编译成Iterator[Object],会在运行时获取真正的类型

`FileScanRDD` 中的`compute方法` 最后有个
```iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.```

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/521018.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

106短信平台群发短信教育行业效果如何?

106短信平台在教育行业中的群发短信效果是显著的&#xff0c;主要体现在以下几个方面&#xff1a; 首先&#xff0c;106短信平台为教育行业提供了一种高效、精准的信息传播方式。教育机构可以通过平台&#xff0c;将课程安排、考试通知、学费扣费等信息及时、准确地传达给学生和…

《手把手教你》系列技巧篇(二十四)-java+ selenium自动化测试-三大延时等待(详细教程)

1.简介 前边讲解完八大元素定位大法&#xff0c;今天宏哥讲解和分享一下三大延时等待。宏哥这里简称“三等八定”。很多人在群里问&#xff0c;这个下拉框定位不到、那个弹出框定位不到…各种定位不到&#xff0c;其实大多数情况下就是两种问题&#xff1a;1. 有frame&#xf…

【机器学习】详解正则化思想

我们的生活当中真正有意义或者有价值的部分可以概括为两句话&#xff1a;一句话是&#xff1a;弄清楚某个东西是怎么一回事&#xff0c;另一句话是&#xff0c;弄清楚某个东西是怎么一回事。头一句话&#xff0c;我们弄清楚的那个东西对于我们而言是未知的&#xff0c;但是已经…

Java多线程循环打印数字和字母问题

前提要求&#xff1a;两个线程&#xff0c;一个线程打印 1~52&#xff0c;另一个线程打印字母A-Z。打印顺序为12A34B56C…… 直接上代码 public class ThreadTest {public static void main(String[] args) {PrintTask printTask new PrintTask();Thread numberThread new T…

AIX系统上卸载软件

AIX上卸载软件采用smitty命令 选择Software Installation and Maintenance 选择Software Manintenance and Utilities 选择Remove Installed Software F4列出软件列表 选择要卸载的软件&#xff0c;PREVIEW only选择no Enter Enter 卸载完成

M1电脑 Xcode15升级遇到的问题

遇到四个问题 一、模拟器下载经常报错。 二、Xcode15报错: SDK does not contain libarclite 三、报错coreAudioTypes not found 四、xcode模拟器运行一次下次必定死机 一、模拟器下载经常报错。 可以https://developer.apple.com/download/all/?qios 下载最新的模拟器&…

【学习资源】对比说明三个通过作者查找文献数据库(一)

最近博主在阅读相关文献的时候&#xff0c;想针对一些作者的科研文献做一个详细的了解&#xff0c;于是涉及到“如何已知作者与其所在单位&#xff0c;查找其研究成果”的问题&#xff0c;博主尝试了在Google Scholar、Web of Science、CRS核心论文库这三个地方通过作者查找文献…

模块自动导入

看的短视频&#xff0c;自己试了下&#xff0c;发现挺好用的&#xff1a;模块自动导入【渡一教育】_哔哩哔哩_bilibili 1. 安装插件 npm i unplugin-auto-import 2. 在vite配置文件&#xff08;vite.config.ts&#xff09;中进行配置&#xff0c; 配置完场后&#xff0c;需要重…

MS9708/MS9710/MS9714

产品简述 MS9708/MS9710/MS9714 是一个 8-Bit/10-Bit/14-Bit 高速、低功耗 D/A 转换器。当采样速率达到 125MSPS 时&#xff0c; MS9708/MS9710/MS9714 也能提供优越的 AC 和 DC 性能。 MS9708/MS9710/MS9714 的正常工作电压范围为 2.7V 到 5.5V &#xff0c;…

3/7—21. 合并两个有序链表

代码实现&#xff1a; 方法1&#xff1a;递归 ---->难点 /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ struct ListNode* mergeTwoLists(struct ListNode *list1, struct ListNode *list2) {/*1.如果l1为…

“空气清新剂”成网红!6.29美金,单周热销近5万单!

草莽时期的“造富神话”或许不再&#xff0c;但TikTok电商仍然生机勃勃&#xff0c;新的爆款、新的营销还在出现。 鉴于美国汽车保有量的增长&#xff0c;车内装饰和汽车用品的需求也相应上升&#xff0c;而TikTok在这一趋势中扮演着至关重要的引导角色&#xff0c;越来越多[汽…

各大厂商常用的弱口令集合

Oms呼叫中心 KXTsoft2010 Glodon控制台 admin TRENDnet趋势网络摄像头 admin/admin MOBOTIX-视频监控 admin/meinsm 思科Cisco 没有原始密码, 第一次登录时需要创建 DRS admin/1234 Honeywell admin/1234 安迅士Axis root/pass, 新安迅士摄像头在用户第一登录时要求创建…