Spark中python和jvm的通信杂谈--ArrowConverter

背景

要提起ArrowConverters,就得说起Arrow这个项目,该项目的初衷是加速进程间的数据交换,从目前的社区发展以及它的周边来看,其实是一个很不错的项目。
那为什么Spark要引入Arrow呢?其实还得从Pyspark中python和jvm的交互方式上说起,目前pyspark采用的py4j与spark jvm进行交互,而数据的交换采用的是jvmpython两个进程间的数据交换(感兴趣的同学可以参考PySpark架构),这个时候引进Arrow恰到好处。

闲说杂谈

spark具体采用的是Arrow IPC,
IPC中用到了flatbuffers这种高效获取序列化数据的组件,再加上IPC采用的是Java NIO的ByteBuffer零拷贝的方式以及RecordBatch列批的方式,大大提升了进程间的数据交换效率。关于NIO的零拷贝参考NIO效率高的原理之零拷贝与直接内存映射

具体细节

直接到ArrowConverters的类中:
主要看两个方法:toBatchIteratorfromBatchIterator

  • ArrowConverters.toBatchIterator
  private[sql] def toBatchIterator(rowIter: Iterator[InternalRow],schema: StructType,maxRecordsPerBatch: Long,timeZoneId: String,context: TaskContext): ArrowBatchIterator = {new ArrowBatchIterator(rowIter, schema, maxRecordsPerBatch, timeZoneId, context)}

这个主要是把spark内部的InternalRow转换为ArrowRecordBatches,方法直接就是返回ArrowBatchIterator类型(Iterator[Array[Byte]]类型)的迭代器:

  • ArrowConverters.fromBatchIterator
  private[sql] def fromBatchIterator(arrowBatchIter: Iterator[Array[Byte]],schema: StructType,timeZoneId: String,context: TaskContext): Iterator[InternalRow] = new InternalRowIteratorWithoutSchema(arrowBatchIter, schema, timeZoneId, context)

这个主要是把序列化的ArrowRecordBatche转换为Spark内部的InternalRow,这里也是直接返回了InternalRowIteratorWithoutSchema类型的迭代器,这里就涉及到了内存的零拷贝,具体的方法如下:

    override def nextBatch(): (Iterator[InternalRow], StructType) = {val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)val root = VectorSchemaRoot.create(arrowSchema, allocator)resources.append(root)val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator)val vectorLoader = new VectorLoader(root)vectorLoader.load(arrowRecordBatch)arrowRecordBatch.close()(vectorSchemaRootToIter(root), schema)}

其中涉及的调用链如下:

ArrowConverters.loadBatch||\/
MessageSerializer.deserializeRecordBatch||\/
readMessageBody||\/
ReadChannel.readFully||\/
buffer.nioBuffer||\/
getDirectBuffer

最后的getDirectBuffer直接返回的是DirectByteBuffer直接内存,这样可以避免了JVM内存到native内存的数据拷贝,尤其是在大数据场景下,提升的效率更加明显,且减少了用户态和内核态的切换次数。

  • 怎么运用到python与spark jvm的交互中
    调用网上的Pyspark的架构图
    在这里插入图片描述

    参考具体conversion.py中部分代码如下:

    jrdd = self._sc._serialize_to_jvm(arrow_data, ser, reader_func, create_RDD_server)
    jdf = self._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(), jsqlContext)
    

    主要在self._jvm.PythonSQLUtils.toDataFrame这个方法中,python调用spark中方法,把序列化的*Iterator[Array[Byte]]*传给jvm执行,具体的细节,读者可以自行参考源代码.

其他

在最新发布的Spark-3.4.0中有一项SPIP,也是采用了Arrow IPC作为数据传输的格式。
当然Arrow Flight SQL也将是一个很好的技术点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/653.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

购物车业务

一、分析购物车vo (1)添加成功页 public class CartItemVo implements Serializable {/*** 商品id*/private Long skuId;/*** 是否选中*/private Boolean check true;/*** 商品标题*/private String title;/*** 商品图片*/private String image;/***…

【Docker】Docker运行时间长,空间不足no space left on device: unknown

空间不足no space left on device: unknown问题解决 1.执行出错2.解决方法3.dump文件是否可以删除 1.执行出错 在运行 docker restart 容器Id查看磁盘空间占用 df -h2.解决方法 这个问题是由与 /run 的空间使用完了,清理/run的空间,经过查找使用最大的是 /run/u…

Windows服务启动exe无界面终极解决方案

1、前言 我这个方案(C#操作)是彻底解决【从Windows服务启动程序exe,程序无界面】问题的终极解决方案,终极方案,绝对的终极方案,本来打算收钱的,还是算了,你们也不容易,关…

【前端工程化】深入浅出vite(二)--vue3全家桶+ts构建后管系统

安装基础包 npm create vitelatest # 这里选择的是VueTypescript的组合 cd vue-admin npm install# 先安装基础包 npm install vue-router4 npm i pinia npm i axios npm install sass --save-dev npm install element-plus --save npm install element-plus/icons-vue npm in…

如何访问NetApp E系列存储的CLI命令行

NetApp存储的E系列(e-series)是收购LSI存储而来的,所以这个产品的install base,也就是安装量其实是很大的,因为早期LSI的商业模式就是OEM,给很多的IT公司做过OEM,比较典型的就是IBM的早期的DS存…

浅谈电瓶车充电桩运营方案 安科瑞 许敏

1. 概述 电动车火灾事故频频发生,毫不起眼的电动车屡次引发夺命大火,电动车已然成为火灾“重灾区”。为预防和遏制电动自行车火灾事故发生,三令五申各种政策,为此安委会曾出台《电动自行车集中停放和充电治理方案》。 大部分充电过…

MySQL日志管理、备份与恢复

文章目录 MySQL日志管理、备份与恢复1 MySQL日志管理1.1 日志的分类1.2 日志的配置1.3 日志查询1.3.1 查看通用查询日志是否开启1.3.2 查看二进制日志是否开启1.3.3 查看慢查询日志功能是否开启1.3.4 查看慢查询时间设置1.3.5 在数据库中设置开启慢查询的方法 2 数据备份2.1 数…

freemarker 使用word模板赋值

1. 引包<dependency><groupId>org.freemarker</groupId><artifactId>freemarker</artifactId><version>2.3.28</version></dependency>word文档工具类import freemarker.template.Configuration; import freemarker.template.…

数据库实验-图书销售管理系统数据库安全管理

一、实验二&#xff1a;图书销售管理系统数据库安全管理 三、实验目的 了解该DBMS系统对数据库管理的内容与方法&#xff0c;特别是理解数据库安全机制和作用&#xff0c;以及PostgreSQL数据库角色管理、用户管理、权限管理的基本方法&#xff0c;培养数据库管理能力。在图书…

Mysql(Linux数据库或者在Navicate中)

Mysql数据库组成 服务端:主要存储数据,并接收用户发过来的SQL语句,并执行结果返回给客户端 客户端:下发用户要执行的sql语句,并显示服务器返回的执行结果 命令行数据库连接方式 mysql -h 数据库 IP -P 端口号 -u 数据库登录用户名 -p 数据库登录密码 -h不加表示为本机,-P不…

Java-String、StringBuffer、StringBuilder区别及相关面试题

目录 一、引言二、String类的基本介绍2.1 创建String对象2.2 字符串的拼接和连接2.3 字符串的比较2.4 字符串的截取和替换2.5 字符串的查找和匹配2.6 创建格式化字符串API文档 三、StringBuffer类的基本介绍3.1 创建StringBuffer对象3.2 字符串的拼接和连接3.3 字符串的插入和删…

简单易用多git服务器ssh密钥配置管理

文章目录 前言一、什么是ssh-key二、配置步骤添加ssh-key配置多ssh-key 总结 前言 快速理解如何配置管理多个git服务器的ssh&#xff0c;当我们有多个git帐号时会涉及如何管理不同的remote使用不同的git账户登陆推送 当前repo的origin remote是github&#xff0c;我们在推送时…