背景
最近在整理了一下 spark对Parquet的写文件的过程,也是为了更好的理解和调优Spark相关的任务,
因为对于Spark来说,任何一个事情都不是独立的存在的,比如说parquet文件的rowgroup设置的大小对读写的影响,以及parquet写之前排序对读parquet的影响,以及向量化读取等等
本文基于Spark 3.5
分析
我们以FileSourceScanExec的doExecute方法
为切口进行分析:
protected override def doExecute(): RDD[InternalRow] = {val numOutputRows = longMetric("numOutputRows")if (needsUnsafeRowConversion) {inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>val toUnsafe = UnsafeProjection.create(schema)toUnsafe.initialize(index)iter.map { row =>numOutputRows += 1toUnsafe(row)}}} else {inputRDD.mapPartitionsInternal { iter =>iter.map { row =>numOutputRows += 1row}}}}
这里的needsUnsafeRowConversion
判断如果是ParquetSource
,且配置了spark.sql.parquet.enableVectorizedReader
为‘true’(默认就是true),则会进行unsafeRow的转换,当然这里的好处就是节约内存以及能够减少GC
对于inputRDD
来说,就是创建了读取parquet
的RDD:
具体的见:ParquetFileFormat.buildReaderWithPartitionValues
方法,涉及到的代码多,所以只解读关键的几个部分:
- fileRooter的读取
val fileFooter = if (enableVectorizedReader) {// When there are vectorized reads, we can avoid reading the footer twice by reading// all row groups in advance and filter row groups according to filters that require// push down (no need to read the footer metadata again).ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS)} else {ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS)}
这里enableVectorizedReader
如果是true的话, fileFooter 只会得到所属Task的FileMetaData信息,其中只包括了所属Task的需要读取的parquet RowGroups,具体的数据流如下:
ParquetFooterReader.readFooter||\/
readFooter||\/
fileReader.getFooter||\/
readFooter(file, options, f, converter)||\/
converter.readParquetMetadata||\/
filter.accept(new MetadataFilterVisitor)
filter.accept(new MetadataFilterVisitor
就会根据对应的filter
类型进行不同的操作:
FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));}@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));}@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);// We must generate the map *before* filtering because it modifies `fileMetadata`.Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);}@Overridepublic FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);// We must generate the map *before* filtering because it modifies `fileMetadata`.Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);}});
-
如果是 ParquetFooterReader.SKIP_ROW_GROUPS ,则是走的
SkipMetadataFilter
这条filter,则只会拿出rowgroup的信息和rowgrups的的行数 -
如果是 enableVectorizedReader,也就是会走
RangeMetadataFilter
这个Filter,则会调用filterFileMetaDataByMidpoint
,该方法会根据Task分配的数据是否覆盖了Rowgroups的中点来纳入到该task的读取的数据中来,具体的可以见:Spark-读取Parquet-为什么task数量会多于Row Group的数量 -
vectorizedReader的创建
vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter))logDebug(s"Appending $partitionSchema ${file.partitionValues}")vectorizedReader.initBatch(partitionSchema, file.partitionValues)if (returningBatch) {vectorizedReader.enableReturningBatches()}// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.iter.asInstanceOf[Iterator[InternalRow]]
-
vectorizedReader.initialize
重要的点是这个主要是涉及到parquet messageType到 ParquetColumn的转换
,主要是ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration)这个的配置
-
vectorizedReader.initBatch
这里面主要涉及到了根据memMode
为OFF_HEAP
还是ON_HEAP
模式来构造不同的ColumnVector
,其中
如果是ON_HEAP
,则会创建OnHeapColumnVector
,用jvm数据的形式存储
如果是OFF_HEAP
,则会创建OffHeapColumnVector
,这里涉及到的对象都是都是用unsafe api来操作,这里涉及到一个有意思的点:Platform.putByte(null, data + rowId, value);Platform.putInt(null, data + 4L * rowId, value)
也就是说 无论是put什么 里面的第一个参数是为
null
,这个其实在Unsafed方法 putInt(Object o, long offset, int x)
类中有提到Fetches a value from a given Java variable. More specifically, fetches a field or array element within the given object o at the given offset, or (if o is null) from the memory address whose numerical value is the given offset.
也就是说如果传入的第一个参数为
null
,则会以offset
作为地址,而在OffHeapColumnVector中对应的put当法中涉及到的offset
就是data
这个变量会在
OffHeapColumnVector构造函数中的reserveInternal方法中赋值
,这其中涉及到unsafe.allocateMemory
方法会返回分配的内存地址
-
具体迭代获取
InternalRow
这里的迭代获取主要是通过vectorizedReader.getCurrentValue
方法实现的,也就是会返回columnarBatch
,但是这里的columnarBatch赋值
是通过
vectorizedReader.nextKeyValue
方法实现的,该方法会被RecordReaderIterator.hasNext
调用,vectorizedReader.nextKeyValue
的数据流如下:VectorizedParquetRecordReader.nextBatch||\/checkEndOfRowGroup => 初始化 PageReadStore pages = reader.readNextRowGroup(); || ||\/ \/columnReader.readBatch(num, leafCv.getValueVector() initColumnReader(pages, cv); // columnVectors 设置ParquetColumnVector 里面包括了rowgroup里的所有page||\/readPage||\/pageReader.readPage()||\/decompressor.decompress // 之类会进行解压
decompressor.decompress
中decompressor
是Chunk.readAllPages
中descriptor.metadata.getCodec()
传进来的,也就是从元数据里面读取的
具体的向量化的读取,细节比较多,包括批量读取definition levels
和repetition levels
等,这些读者自行分析注意:为什么
FileSourceScanExec中inputRDDs
返回的类型是RDD[InternalRow]
,而vectorizedReader.getCurrentValue
返回的类型是ColumnarBatch
也能运行,那是因为 我们在运行的时候,会有ColumnarToRow
,他最终调用的是FileSourceScanExec.doExecuteColumnar
,如下图:
-
且jvm
会对Iterator[InternalRow]
进行类型擦除,也就是说所有Iterator[InternalRow]在编译的时候会编译成Iterator[Object],会在运行时获取真正的类型
`FileScanRDD` 中的`compute方法` 最后有个
```iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.```