数据工程中的单元测试完全指南

在数据工程领域中,经常被忽视的一项实践是单元测试。许多人可能认为单元测试仅仅是一种软件开发方法论,但事实远非如此。随着我们努力构建稳健、无错误的数据流水线和SQL数据模型,单元测试在数据工程中的价值变得越来越清晰。

本文带你深入探索如何将这些成熟的软件工程实践应用到数据工程中。

1 单元测试的重要性

在数据工程的背景下,采用单元测试可以确保您的数据和业务逻辑的准确性,进而产出高质量的数据,获得您的数据分析师、科学家和决策者对数据的信任。

2 单元测试数据流水线

数据流水线通常涉及复杂的数据抽取、转换和加载(ETL)操作序列,出错的可能性很大。为了对这些操作进行单元测试,我们将流水线拆分为单个组件,并对每个组件进行独立验证。

以一个简单的流水线为例,该流水线从CSV文件中提取数据,通过清除空值来转换数据,然后将其加载到数据库中。以下是使用pandas的基于Python的示例:

import pandas as pdfrom sqlalchemy import create_engine# 加载CSV文件的函数def load_data(file_name):data = pd.read_csv(file_name)return data# 清理数据的函数def clean_data(data):data = data.dropna()return data# 将数据保存到SQL数据库的函数def save_data(data, db_string, table_name):engine = create_engine(db_string)data.to_sql(table_name, engine, if_exists='replace')# 运行数据流水线data = load_data('data.csv')data = clean_data(data)save_data(data, 'sqlite:///database.db', 'my_table')

为了对这个流水线进行单元测试,我们使用像pytest这样的库为每个函数编写单独的测试。

在这个示例中,有三个主要的函数:load_data、clean_data和save_data。我们会为每个函数编写测试。对于load_data和save_data,需要设置一个临时的CSV文件和SQLite数据库,可以使用pytest库的fixture功能来实现。

import osimport pandas as pdimport pytestfrom sqlalchemy import create_engine, inspect# 使用pytest fixture来设置临时的CSV文件和SQLite数据库@pytest.fixturedef csv_file(tmp_path):data = pd.DataFrame({'name': ['John', 'Jane', 'Doe'],'age': [34, None, 56]  # Jane的年龄缺失})file_path = tmp_path / "data.csv"data.to_csv(file_path, index=False)return file_path@pytest.fixturedef sqlite_db(tmp_path):file_path = tmp_path / "database.db"return 'sqlite:///' + str(file_path)def test_load_data(csv_file):data = load_data(csv_file)assert 'name' in data.columnsassert 'age' in data.columnsassert len(data) == 3def test_clean_data(csv_file):data = load_data(csv_file)data = clean_data(data)assert data['age'].isna().sum() == 0assert len(data) == 2  # Jane的记录应该被删除def test_save_data(csv_file, sqlite_db):data = load_data(csv_file)data = clean_data(data)save_data(data, sqlite_db, 'my_table')# 检查数据是否保存正确engine = create_engine(sqlite_db)inspector = inspect(engine)tables = inspector.get_table_names()assert 'my_table' in tablesloaded_data = pd.read_sql('my_table', engine)assert len(loaded_data) == 2  # 只应该存在John和Doe的记录

这里是另一个例子:假设您有一个从CSV文件中加载数据并将其中的“日期”列从字符串转换为日期时间的流水线:

def convert_date(data, date_column):data[date_column] = pd.to_datetime(data[date_column])return data

为上述函数编写的单元测试将传入具有已知日期字符串格式的DataFrame。然后,它将验证函数是否正确将日期转换为日期时间对象,并且它是否适当处理无效格式。

我们为上述场景编写一个单元测试。该测试首先使用有效日期检查函数,断言输出DataFrame中的“date”列确实是datetime类型,并且值与预期相符。然后,它检查在给出无效日期时,函数是否正确引发了ValueError。

import pandas as pdimport pytestdef test_convert_date():# 使用有效日期进行测试test_data = pd.DataFrame({'date': ['2021-01-01', '2021-01-02']})converted_data = convert_date(test_data.copy(), 'date')assert pd.api.types.is_datetime64_any_dtype(converted_data['date'])assert converted_data.loc[0, 'date'] == pd.Timestamp('2021-01-01')assert converted_data.loc[1, 'date'] == pd.Timestamp('2021-01-02')# 使用无效日期进行测试test_data = pd.DataFrame({'date': ['2021-13-01']  # 这个日期是无效的,因为没有第13个月})with pytest.raises(ValueError):convert_date(test_data, 'date')

以下是最后一个例子:假设您有一个加载数据并进行聚合的流水线,计算每个地区的总销售额:

def aggregate_sales(data):aggregated = data.groupby('region').sales.sum().reset_index()return aggregated

为该函数编写的单元测试将向其传递具有各个地区销售数据的DataFrame。测试将验证函数是否正确计算每个地区的总销售额。

我们为该函数编写一个单元测试。在这个测试中,我们首先向aggregate_sales函数传递一个具有已知销售数据的DataFrame,并检查它是否正确聚合了销售额。然后,向其传递一个没有销售数据的DataFrame,并检查它是否正确将这些销售额聚合为0。这样可以确保函数正确处理典型情况和边缘情况。

以下是使用pytest库为aggregate_sales函数编写单元测试的示例:

import pandas as pdimport pytestdef test_aggregate_sales():# 各个地区的销售数据test_data = pd.DataFrame({'region': ['North', 'North', 'South', 'South', 'East', 'East', 'West', 'West'],'sales': [100, 200, 300, 400, 500, 600, 700, 800]})aggregated = aggregate_sales(test_data)assert aggregated.loc[aggregated['region'] == 'North', 'sales'].values[0] == 300assert aggregated.loc[aggregated['region'] == 'South', 'sales'].values[0] == 700assert aggregated.loc[aggregated['region'] == 'East', 'sales'].values[0] == 1100assert aggregated.loc[aggregated['region'] == 'West', 'sales'].values[0] == 1500# 没有销售数据的测试test_data = pd.DataFrame({'region': ['North', 'South', 'East', 'West'],'sales': [0, 0, 0, 0]})aggregated = aggregate_sales(test_data)assert aggregated.loc[aggregated['region'] == 'North', 'sales'].values[0] == 0assert aggregated.loc[aggregated['region'] == 'South', 'sales'].values[0] == 0assert aggregated.loc[aggregated['region'] == 'East', 'sales'].values[0] == 0assert aggregated.loc[aggregated['region'] == 'West', 'sales'].values[0] == 0

感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/115012.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YashanDB荣获“鼎新杯”数字化转型应用奖项

近日,深算院YashanDB 团队与深燃集团联合共建的深圳燃气集团数据库国产化建设项目,荣获第二届“鼎新杯”数字化转型应用大赛信息技术应用创新赛道二等奖!此次获奖,彰显了崖山数据库系统YashanDB自主领先的国产数据库技术优势和优秀…

阿里云服务器经济型e实例租用价格和CPU性能测评

阿里云服务器ECS推出经济型e系列,经济型e实例是阿里云面向个人开发者、学生、小微企业,在中小型网站建设、开发测试、轻量级应用等场景推出的全新入门级云服务器,CPU采用Intel Xeon Platinum架构处理器,支持1:1、1:2、1:4多种处理…

安装gpu版本的paddle

安装gpu版本的paddle python -m pip install paddlepaddle-gpu2.3.2.post111 -f https://www.paddlepaddle.org.cn/whl/windows/mkl/avx/stable.html以上支持cuda11.1版本 其他需求可查阅文档在这里

python 串口发送图片给arduino

python 代码 import serial import threading import time from PIL import Image from PIL import ImageSequence## im.show()##print(img) ## ####img b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00…

mySQL中查询统计俩个表相加COUNT之合

最近工作中需要用到查询语句结果为统计当前表和历史表加起来的数据之和需要把俩条数据的合加起来返回给我们: select count() from work_order_history where valid true and and state ‘8’ and create_time like ‘%20230901%’; 返回结果如下 另一条SQL如下&…

七天学会C语言-第六天(指针)

1.指针变量与普通变量 指针变量与普通变量是C语言中的两种不同类型的变量,它们有一些重要的区别和联系。 普通变量是一种存储数据的容器,可以直接存储和访问数据的值。: int num 10; // 定义一个整数型普通变量num,赋值为10在例…

【Proteus仿真】【STM32单片机】STM32脉搏血氧仪

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 系统运行后,LCD1604液晶显示心率、血氧和温度、时间日期; 如果心率、血氧超限则报警;蓝牙实时传输数据; 二、软件设计 /* 作者:嗨小易&#xf…

24个Docker常见问题处理技巧

1.Docker 迁移存储目录 默认情况系统会将 Docker 容器存放在 var/lib/docker 目录下 [问题起因] 今天通过监控系统,发现公司其中一台服务器的磁盘快慢,随即上去看了下,发现 /var/lib/docker这个目录特别大。 由上述原因,我们都知…

模式分类与“组件协作模式”

1. GOF-23 模式分类 从目的来看: 创建型(Creational)模式:将对象的部分创建工作延迟到子类或者其他对象,从而应对需求变化为对象创建时具体类型实现引来的冲击。结构型(Structural)模式&#…

Vue中如何进行跨域处理

Vue中的跨域请求处理:解决前端开发中的常见问题 跨域请求是前端开发中常见的问题之一。Vue.js是一款流行的前端框架,如何在Vue中处理跨域请求是每个Vue开发者都需要了解的重要课题。本文将深入探讨什么是跨域请求,为什么它会出现&#xff0c…

linux下特定usb设备的权限设置

文章目录 背景查找资料解决方案 背景 目前我在Ubuntu下使用一个USB热成像摄像头,通过调用它的sdk进行图像的采集以及获取对应像素点的温度。假设我现在的测试程序名称为MyApp。 当我用下面的命令运行时,程序是正常运行且能从热成像仪采集图像 sudo ./M…

华为OD机试 - 特异性双端队列(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、Java算法源码五、效果展示1、输入2、输出 华为OD机试 2023B卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(A卷B卷)》。 刷的越多…