并行计算框架Polars、Dask的数据处理性能对比

在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。

本文我们使用两个类似的脚本来执行提取、转换和加载(ETL)过程。

测试内容

这两个脚本主要功能包括:

从两个parquet 文件中提取数据,对于小型数据集,变量path1将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量path1将是“yellow_tripdata/yellow_tripdata”。对于大数据集,变量path1将是“yellow_tripdata/yellow_tripdata*.parquet”;

进行数据转换:a)连接两个DF,b)根据PULocationID计算行程距离的平均值,c)只选择某些条件的行,d)将步骤b的值四舍五入为2位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序

将最终的结果保存到新的文件

脚本

1、Polars

数据加载读取

 def extraction():"""Extract two datasets from parquet files"""path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"df_trips= pl_read_parquet(path1,)path2 = "taxi+_zone_lookup.parquet"df_zone = pl_read_parquet(path2,)return df_trips, df_zonedef pl_read_parquet(path, ):"""Converting parquet file into Polars dataframe"""df= pl.scan_parquet(path,)return df

转换函数

 def transformation(df_trips, df_zone):"""Proceed to several transformations"""df_trips= mean_test_speed_pl(df_trips, )df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)df = df.select(["Borough","Zone","trip_distance",])df = get_Queens_test_speed_pd(df)df = round_column(df, "trip_distance",2)df = rename_column(df, "trip_distance","mean_trip_distance")df = sort_by_columns_desc(df, "mean_trip_distance")return dfdef mean_test_speed_pl(df_pl,):"""Getting Mean per PULocationID"""df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())return df_pldef get_Queens_test_speed_pd(df_pl):"""Only getting Borough in Queens"""df_pl = df_pl.filter(pl.col("Borough")=='Queens')return df_pldef round_column(df, column,to_round):"""Round numbers on columns"""df = df.with_columns(pl.col(column).round(to_round))return dfdef rename_column(df, column_old, column_new):"""Renaming columns"""df = df.rename({column_old: column_new})return dfdef sort_by_columns_desc(df, column):"""Sort by column"""df = df.sort(column, descending=True)return df

保存

 def loading_into_parquet(df_pl):"""Save dataframe in parquet"""df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')

其他代码

 import polars as plimport timedef pl_read_parquet(path, ):"""Converting parquet file into Polars dataframe"""df= pl.scan_parquet(path,)return dfdef mean_test_speed_pl(df_pl,):"""Getting Mean per PULocationID"""df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())return df_pldef get_Queens_test_speed_pd(df_pl):"""Only getting Borough in Queens"""df_pl = df_pl.filter(pl.col("Borough")=='Queens')return df_pldef round_column(df, column,to_round):"""Round numbers on columns"""df = df.with_columns(pl.col(column).round(to_round))return dfdef rename_column(df, column_old, column_new):"""Renaming columns"""df = df.rename({column_old: column_new})return dfdef sort_by_columns_desc(df, column):"""Sort by column"""df = df.sort(column, descending=True)return dfdef main():print(f'Starting ETL for Polars')start_time = time.perf_counter()print('Extracting...')df_trips, df_zone =extraction()end_extract=time.perf_counter() time_extract =end_extract- start_timeprint(f'Extraction Parquet end in {round(time_extract,5)} seconds')print('Transforming...')df = transformation(df_trips, df_zone)end_transform = time.perf_counter() time_transformation =time.perf_counter() - end_extractprint(f'Transformation end in {round(time_transformation,5)} seconds')print('Loading...')loading_into_parquet(df,)load_transformation =time.perf_counter() - end_transformprint(f'Loading end in {round(load_transformation,5)} seconds')print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")if __name__ == "__main__":main()

2、Dask

函数功能与上面一样,所以我们把代码整合在一起:

 import dask.dataframe as ddfrom dask.distributed import Clientimport timedef extraction():path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"df_trips = dd.read_parquet(path1)path2 = "taxi+_zone_lookup.parquet"df_zone = dd.read_parquet(path2)return df_trips, df_zonedef transformation(df_trips, df_zone):df_trips = mean_test_speed_dask(df_trips)df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")df = df[["Borough", "Zone", "trip_distance"]]df = get_Queens_test_speed_dask(df)df = round_column(df, "trip_distance", 2)df = rename_column(df, "trip_distance", "mean_trip_distance")df = sort_by_columns_desc(df, "mean_trip_distance")return dfdef loading_into_parquet(df_dask):df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")def mean_test_speed_dask(df_dask):df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})return df_daskdef get_Queens_test_speed_dask(df_dask):df_dask = df_dask[df_dask["Borough"] == "Queens"]return df_daskdef round_column(df, column, to_round):df[column] = df[column].round(to_round)return dfdef rename_column(df, column_old, column_new):df = df.rename(columns={column_old: column_new})return dfdef sort_by_columns_desc(df, column):df = df.sort_values(column, ascending=False)return dfdef main():print("Starting ETL for Dask")start_time = time.perf_counter()client = Client()  # Start Dask Clientdf_trips, df_zone = extraction()end_extract = time.perf_counter()time_extract = end_extract - start_timeprint(f"Extraction Parquet end in {round(time_extract, 5)} seconds")print("Transforming...")df = transformation(df_trips, df_zone)end_transform = time.perf_counter()time_transformation = time.perf_counter() - end_extractprint(f"Transformation end in {round(time_transformation, 5)} seconds")print("Loading...")loading_into_parquet(df)load_transformation = time.perf_counter() - end_transformprint(f"Loading end in {round(load_transformation, 5)} seconds")print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")client.close()  # Close Dask Clientif __name__ == "__main__":main()

测试结果对比

1、小数据集

我们使用164 Mb的数据集,这样大小的数据集对我们来说比较小,在日常中也时非常常见的。

下面是每个库运行五次的结果:

Polars

Dask

2、中等数据集

我们使用1.1 Gb的数据集,这种类型的数据集是GB级别,虽然可以完整的加载到内存中,但是数据体量要比小数据集大很多。

Polars

Dask

3、大数据集

我们使用一个8gb的数据集,这样大的数据集可能一次性加载不到内存中,需要框架的处理。

Polars

Dask

总结

从结果中可以看出,Polars和Dask都可以使用惰性求值。所以读取和转换非常快,执行它们的时间几乎不随数据集大小而变化;

可以看到这两个库都非常擅长处理中等规模的数据集。

由于polar和Dask都是使用惰性运行的,所以下面展示了完整ETL的结果(平均运行5次)。

Polars在小型数据集和中型数据集的测试中都取得了胜利。但是,Dask在大型数据集上的平均时间性能为26秒。

这可能和Dask的并行计算优化有关,因为官方的文档说“Dask任务的运行速度比Spark ETL查询快三倍,并且使用更少的CPU资源”。

上面是测试使用的电脑配置,Dask在计算时占用的CPU更多,可以说并行性能更好。

https://avoid.overfit.cn/post/74128cd8803b43f2a51ca4ff4fed4a95

作者:Luís Oliveira

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/21902.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

我爱学QT--qt的网络编程

学习地址: QT网络编程之TCP通信_哔哩哔哩_bilibili QT网络编程有TCP和UDP。 TCP编程需要用到两个类:QTcpServer和QTcpSocket 本节课目标: 完成一个服务器和一个客户端 首先是经典的几步 先设计ui再设计逻辑实现 多看看写的文件理解吧

设计模式-简单工厂模式

文章目录 简单工厂设计模式什么是简单工厂?为什么使用简单工厂工厂模式代码实现简单工厂优缺点优点: 简单工厂设计模式 学习视频 什么是简单工厂? 简单工厂模式属于类的创建型模式,又叫做静态工厂方法模式。通过专门定义一个类来负责创建其他类的实…

vue中使用Pinia和Vuex详解

最具有争议的Pinia和Vuex那个更好? 我们使用Vue2的时候,Vuex作为一个状态管理工具在组件中使用方便了很多。Vue3推出后,虽然相对于Vue2很多东西都变了,但是核心的东西还是没有变的,比如说状态管理、路由等等。实际上&a…

Group, AnimationUpdate, Menu 的使用

1. Group 组堆栈布局的使用 1.1 实现 // 组堆栈 struct GroupBootcamp: View {var body: some View {VStack(spacing: 50) {Text("Hello, world!")Group() {Text("Hello, world!")Text("Hello, world!")}.font(.caption).foregroundColor(.gree…

智头条|第25届中国建博会(广州)成功举行,马斯克组建xAI公司

行业动态: 第25届中国建博会(广州)成功举行 7月8日至11日期间,2023中国建博会(广州)暨首届广州卫博会在广州如火如荼地进行。本届展会以“冠军企业首秀平台”为定位,以“建装理想家,服务新格局”为主题&a…

conda的使用

一、conda 1、为什么使用conda 在安装Python包的过程中,可能遇到依赖包的问题。例如,要安装numpy,需要先安装BLAS和LAPACK等库。在使用pip等包管理工具时,这些依赖包需要手动安装,操作起来可能比较繁琐。而conda是一个…

Redis的缓存问题

说起Redis的缓存,我们知道前端发出的请求到后端,后端先从Redis中查询,如果查询到了则直接返回,如果Redis中未查询到,就去数据库中查询,如果数据库中存在,则返回结果并且更新到Redis缓存当中&…

Redis_简介(1)

目录 Redis简介 Redis特性 Redis 优势 Redis应用场景 源码等资料获取方法 Redis简介 Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。从2010年3月15日起,Redis的开发工作由…

leetcode 450. 删除二叉搜索树中的节点

2023.7.14 搜索二叉树相关的题一般都能用递归解决。 本体大致思路是:使用递归的方式,在树中查找目标节点,并根据节点的情况进行删除操作。如果目标节点是叶子节点,直接删除它;如果目标节点只有一个子树,将子…

mysql笔记

目录 1、root用户密码忘记 2、SQL的分类 2.1、DQL数据查询语言 前言 2.1.1、设置别名 2.1.2、去除重复行 2.1.3、空值参与运算 2.1.4、着重号 2.1.5、显示表结构 2.1.6、算数运算符 2.1.7、比较运算符 2.1.8、逻辑运算符 2.1.9、位运算符 2.1.10、 模糊查询 2.1.…

经典CNN(一):ResNet-50算法实战与解析

🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊|接辅导、项目定制 1 ResNet理论 深度残差网络ResNet(deep residual network)在2015年由何凯明等提出,因为它简单与实用并存,随后很多研究…

cloud Alibab+nacos+gateway集成swaggerui,统一文档管理(注意点)

首先说明&#xff1a;本文只说整合注意点 效果图和功能参考链接 1.使用gateway访问nacos服务&#xff0c;503 在网关服务添加依赖即可解决 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign&…