Paimon lookup store 实现

news/2025/1/20 5:55:00/文章来源:https://www.cnblogs.com/Aitozi/p/18514737

Lookup Store 主要用于 Paimon 中的 Lookup Compaction 以及 Lookup join 的场景. 会将远程的列存文件在本地转化为 KV 查找的格式.

Hash

https://github.com/linkedin/PalDB

Sort

https://github.com/dain/leveldb
https://github.com/apache/paimon/pull/3770

Pasted image 20241029081723
整体文件结构:

Pasted image 20241029230800|182

相比于 Hash file 的优势

  • 一次写入, 避免了文件merge
  • 顺序写入, 保持原先的 key 的顺序, 后续如果按照 key 的顺序查找, 可提升缓存效率

SortLookupStoreWriter

SortLookupStoreWriter#put

put

@Override
public void put(byte[] key, byte[] value) throws IOException {dataBlockWriter.add(key, value);if (bloomFilter != null) {bloomFilter.addHash(MurmurHashUtils.hashBytes(key));}lastKey = key;// 当BlockWriter写入达到一定阈值, 默认是 cache-page-size=64kb.if (dataBlockWriter.memory() > blockSize) {flush();}recordCount++;
}

flush

private void flush() throws IOException {  if (dataBlockWriter.size() == 0) {  return;  }  // 将data block写入数据文件, 并记录对应的position和长度BlockHandle blockHandle = writeBlock(dataBlockWriter);  MemorySlice handleEncoding = writeBlockHandle(blockHandle);// 将BlockHandle 写入index writer, 这也通过是一个BlockWriter写的indexBlockWriter.add(lastKey, handleEncoding.copyBytes());  
}

writeBlock

private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException {// close the block// 获取block的完整数组, 此时blockWriter中的数组并不会被释放, 而是会继续复用MemorySlice block = blockWriter.finish();totalUncompressedSize += block.length();// attempt to compress the blockBlockCompressionType blockCompressionType = BlockCompressionType.NONE;if (blockCompressor != null) {int maxCompressedSize = blockCompressor.getMaxCompressedSize(block.length());byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);int offset = encodeInt(compressed, 0, block.length());int compressedSize =offset+ blockCompressor.compress(block.getHeapMemory(),block.offset(),block.length(),compressed,offset);// Don't use the compressed data if compressed less than 12.5%,if (compressedSize < block.length() - (block.length() / 8)) {block = new MemorySlice(MemorySegment.wrap(compressed), 0, compressedSize);blockCompressionType = this.compressionType;}}totalCompressedSize += block.length();// create block trailer// 每一块block会有一个trailer, 记录压缩类型和crc32校验码BlockTrailer blockTrailer =new BlockTrailer(blockCompressionType, crc32c(block, blockCompressionType));MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);// create a handle to this block// BlockHandle 记录了每个block的其实position和长度BlockHandle blockHandle = new BlockHandle(position, block.length());// write data// 将数据追加写入磁盘文件writeSlice(block);// write trailer: 5 bytes// 写出trailerwriteSlice(trailer);// clean up stateblockWriter.reset();return blockHandle;
}

close

public LookupStoreFactory.Context close() throws IOException {// flush current data blockflush();LOG.info("Number of record: {}", recordCount);// write bloom filter@Nullable BloomFilterHandle bloomFilterHandle = null;if (bloomFilter != null) {MemorySegment buffer = bloomFilter.getBuffer();bloomFilterHandle =new BloomFilterHandle(position, buffer.size(), bloomFilter.expectedEntries());writeSlice(MemorySlice.wrap(buffer));LOG.info("Bloom filter size: {} bytes", bloomFilter.getBuffer().size());}// write index block// 将index数据写出至文件BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);// write footer// Footer 记录bloomfiler + indexFooter footer = new Footer(bloomFilterHandle, indexBlockHandle);MemorySlice footerEncoding = Footer.writeFooter(footer);writeSlice(footerEncoding);// 最后关闭文件// close filefileOutputStream.close();LOG.info("totalUncompressedSize: {}", MemorySize.ofBytes(totalUncompressedSize));LOG.info("totalCompressedSize: {}", MemorySize.ofBytes(totalCompressedSize));return new SortContext(position);
}

BlockWriter

add

public void add(byte[] key, byte[] value) {int startPosition = block.size();// 写入key长度block.writeVarLenInt(key.length);// 写入keyblock.writeBytes(key);// 写入value长度block.writeVarLenInt(value.length);// 写入valueblock.writeBytes(value);int endPosition = block.size();// 使用一个int数组记录每个KV pair的起始位置作为索引positions.add(startPosition);// 是否对齐. 是否对齐取决于每个KV对的长度是否一样if (aligned) {int currentSize = endPosition - startPosition;if (alignedSize == 0) {alignedSize = currentSize;} else {aligned = alignedSize == currentSize;}}
}
  • 这里的 block 对应于一块可扩容的 MemorySegment, 也就是 byte[] , 当写入长度超过当前数组的长度时, 就会扩容

finish

public MemorySlice finish() throws IOException {if (positions.isEmpty()) {throw new IllegalStateException();}// 当通过BlockWriter写出的数据长度都是对齐的时, 就不需要记录各个Position的index了, 只需要记录一个对齐长度, 读取时自己可以计算.if (aligned) {block.writeInt(alignedSize);} else {for (int i = 0; i < positions.size(); i++) {block.writeInt(positions.get(i));}block.writeInt(positions.size());}block.writeByte(aligned ? ALIGNED.toByte() : UNALIGNED.toByte());return block.toSlice();
}

小结

整个文件的写出过程非常简单, 就是按 block 写出, 并且记录每个 block 的位置, 作为 index.

SortLookupStoreReader

读取的过程, 主要就是为了查找 key 是否存在, 以及对应的 value 或者对应的行号.

public byte[] lookup(byte[] key) throws IOException {// 先通过bloomfilter提前进行判断if (bloomFilter != null && !bloomFilter.testHash(MurmurHashUtils.hashBytes(key))) {return null;}MemorySlice keySlice = MemorySlice.wrap(key);// seek the index to the block containing the keyindexBlockIterator.seekTo(keySlice);// if indexIterator does not have a next, it means the key does not exist in this iteratorif (indexBlockIterator.hasNext()) {// seek the current iterator to the key// 根据从index block中读取到的key value的位置(BlockHandle), 读取对应的value blockBlockIterator current = getNextBlock();// 在value的iterator中再次二分查找寻找对应block中是否存在match的key, 如果存在则返回对应的数据if (current.seekTo(keySlice)) {return current.next().getValue().copyBytes();}}return null;
}
  • 查找一次 key 会经历两次二分查找(index + value).

BlockReader

// 从block创建一个iterator
public BlockIterator iterator() {BlockAlignedType alignedType =BlockAlignedType.fromByte(block.readByte(block.length() - 1));int intValue = block.readInt(block.length() - 5);if (alignedType == ALIGNED) {return new AlignedIterator(block.slice(0, block.length() - 5), intValue, comparator);} else {int indexLength = intValue * 4;int indexOffset = block.length() - 5 - indexLength;MemorySlice data = block.slice(0, indexOffset);MemorySlice index = block.slice(indexOffset, indexLength);return new UnalignedIterator(data, index, comparator);}
}

SliceCompartor

这里面传入了 keyComparator, 用于进行 key 的比较. 用于在 index 中进行二分查找. 这里的比较并不是直接基于原始的数据, 而是基于 MemorySlice 进行排序.

比较的过程会将 key 的各个字段从 MemorySegment 中读取反序列化出来, cast 成 Comparable 进行比较.

public SliceComparator(RowType rowType) {int bitSetInBytes = calculateBitSetInBytes(rowType.getFieldCount());this.reader1 = new RowReader(bitSetInBytes);this.reader2 = new RowReader(bitSetInBytes);this.fieldReaders = new FieldReader[rowType.getFieldCount()];for (int i = 0; i < rowType.getFieldCount(); i++) {fieldReaders[i] = createFieldReader(rowType.getTypeAt(i));}
}@Override
public int compare(MemorySlice slice1, MemorySlice slice2) {reader1.pointTo(slice1.segment(), slice1.offset());reader2.pointTo(slice2.segment(), slice2.offset());for (int i = 0; i < fieldReaders.length; i++) {boolean isNull1 = reader1.isNullAt(i);boolean isNull2 = reader2.isNullAt(i);if (!isNull1 || !isNull2) {if (isNull1) {return -1;} else if (isNull2) {return 1;} else {FieldReader fieldReader = fieldReaders[i];Object o1 = fieldReader.readField(reader1, i);Object o2 = fieldReader.readField(reader2, i);@SuppressWarnings({"unchecked", "rawtypes"})int comp = ((Comparable) o1).compareTo(o2);if (comp != 0) {return comp;}}}}return 0;
}

查找的实现就是二分查找的过程, 因为写入的 key 是有序写入的.

public boolean seekTo(MemorySlice targetKey) {int left = 0;int right = recordCount - 1;while (left <= right) {int mid = left + (right - left) / 2;// 对于aligned iterator, 就直接seek record * recordSize// 对于unaligned iterator, 就根据writer写入的索引表来跳转seekTo(mid);// 读取一条key value pairBlockEntry midEntry = readEntry();int compare = comparator.compare(midEntry.getKey(), targetKey);if (compare == 0) {polled = midEntry;return true;} else if (compare > 0) {polled = midEntry;right = mid - 1;} else {left = mid + 1;}}return false;
}

小结

查找过程

  • 先过一遍 bloom filter
  • index 索引查找对应 key 的 block handle
  • 根据第二步的 handle, 读取对应的 block, 在 block 中查找对应的 key value.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/823937.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

研究生如何利用 ChatGPT 帮助开展日常科研工作

研究生可以利用 ChatGPT 帮助开展日常科研工作:1. 文献综述与资料查找;2. 论文写作与润色;3. 问题解答与研究思路拓展;4. 实验设计与数据分析;5. 科研计划和进度管理;6. 学术交流和论文审阅。研究生常常需要面对海量文献,ChatGPT 可以成为文献综述的得力助手。1. 文献综…

13.Java的IO流

文件 概念文件:保存数据的地方。 文件流:文件在程序中是以流的形式来操作的。 流:数据在数据源(文件)和程序(内存)之间经历的路径。输入流:数据从数据源(文件)到程序(内存)的路径。 输出流:数据从程序(内存)到数据源(文件)的路径。常用操作构造方法方法 说明F…

论文词汇积累-铁路专业术语如车务段等(在进行小论文翻译的过程中遇到的问题,搜集整理一下)

一、铁路专用英语 专业词汇中英对照翻译 来源:https://blog.csdn.net/weixin_44304362/article/details/108827567 铁路工程词汇 线路工程 railway line engineering 铁路勘测 ;铁道勘测 railway reconnaissance 铁路选线 ;铁道选线 railway route selection;railway location…

考研打卡(2)

开局(2) 开始时间 2024-10-29 19:21:57 结束时间 2024-10-29 23:32:52呜呜,昨天被老师骂了数据结构能说明快速排序是不稳定的排序方法的一组关键字序列是____(暨南大学2011) A (10,20,30,40,50) B (50,40,30,20,10) C (20,20,30,10,40) D (20,40,30,30,10)C (20,…

vue2基础组件通信案例练习:把案例Todo-list改写成本地缓存

vue2基础组件通信案例练习:把案例Todo-list改写成本地缓存@目录概述前端代码本人其他相关文章链接 概述前面文章案例已经练习了父子组件之间的通信,这一节讲述如何把todos数组放进本地缓存中,因为实际开发场景中频繁查询的数据很有可能会用到本地缓存技术。思考:如何改成使…

Webstorm 2024 安装使用 (附加永久激活码、补丁)

下载安装第二步,安装完成之后,下载补丁 下载地址(里面包含激活码)完成,之后输入激活码免责声明:本文中的资源均来自互联网,仅供个人学习和交流使用,严禁用于商业行为,下载后请在24小时内从电脑中彻底删除。对于因非法使用而引起的版权争议,与作者无关。所有资源仅供学习…

高级语言程序设计课程第五次个人作业

这个作业属于哪个课程:https://edu.cnblogs.com/campus/fzu/2024C/ 这个作业要求在哪里: https://edu.cnblogs.com/campus/fzu/2024C/homework/13298 学号:<102400229> 姓名:<杨灿> 书本第8章8.11编程练习题目中的第1题 没有问题书本第8章8.11编程练习题目中的…

代码生产力提高100倍,Claude-3.5 +Cline 打造超强代码智能体!小白也能开发各种app!

嘿,各位小伙伴们。 今天,带大家走进神奇的 AI 世界,一起探索强大的工具和技术。 最近,Anthropic 发布了全新的 Claude-3.5-sonnet 模型,这可是 Claude-3.5-sonnet 模型的升级版哦!这款最新的模型在多方面的能力都有了显著提升,尤其是在编程方面。已经完全超越 GPT 模型,…

在线协作产品有哪些

在线协作产品主要有以下四类:一、通信工具,如Slack、Microsoft Teams、Zoom;二、文件共享与协作,如Google Workspace、Dropbox、Microsoft OneDrive;三、项目管理与任务追踪,如Trello、Asana、JIRA;四、设计与创作协作,如Figma、Adobe Creative Cloud、Canva。通信工具…

AEER-Applied Ecology and Environmental Research

生态环境、生物地理学、动物学、植物学、古生物学、生物计量学、生物数学和定量的生态学或多学科农业研究。@目录一、征稿简介二、重要信息三、服务简述四、投稿须知 一、征稿简介二、重要信息期刊官网:https://ais.cn/u/3eEJNv三、服务简述 生态环境、生物地理学、动物学、植…

学习笔记(十二):ArkUi-相对布局 (RelativeContainer)

基本概念锚点:通过锚点设置当前元素基于哪个元素确定位置。对齐方式:通过对齐方式,设置当前元素是基于锚点的上中下对齐,还是基于锚点的左中右对齐。锚点设置 锚点设置是指设置子元素相对于父元素或兄弟元素的位置依赖关系。 在水平方向上,可以设置left、middle、right的锚…

Dingdone和Apicloud开发出的APP的区别在哪里

Dingdone和Apicloud是两个流行的移动应用开发平台,它们在许多方面具有不同的特点和优势。本文将详细探讨:1、开发环境和工具集的差异;2、编程语言和框架支持的对比;3、开发效率和灵活性的区别;4、社区支持和资源的差异。例如,Dingdone可能更专注于提供快速开发的解决方案…