ETL.NET 助力海量数据轻松处理
- 什么是 ETL & EtlT ?
- About ETL
- About EtlT
- 谈谈 ETL 作用
- ETL 对企业的作用
- ETL 对个人职业发展的作用
- ETL.NET 介绍
- ETL.NET 功能特点
- 1、它包含 SSIS 的所有转换和功能
- 2、开箱即用的功能
- 如何使用 ETL.NET ?
- ETL.NET 相关资源
- Paillave.EtlNet 系列 Nuget 包
- Examples 应用举例
- 1、创建控制台项目
- 2、添加依赖 nuget 包
- 3、应用案例代码编写
- 4、准备数据库表结构
- 5、准备数据源
- 验证测试
随着数字化转型的不断深入,数据在企业和个人生活中扮演着日益重要的角色。
在企业方面,数据可以帮助企业更好地了解客户需求、市场趋势和业务表现,从而更好地制定战略和决策。此外,数据还可以帮助企业优化业务流程、提高效率和降低成本。
在个人生活方面,数据也扮演着越来越重要的角色。例如,我们使用智能手机、智能手表等设备时,这些设备会收集我们的健康数据、位置数据等信息,帮助我们更好地管理自己的生活和健康。此外,社交媒体、电子商务等应用程序也会收集我们的数据,以便更好地为我们提供个性化的服务和推荐。
什么是 ETL & EtlT ?
About ETL
ETL
(抽取、转换和加载)是一种数据处理方法,它对于企业和个人来说都具有重要意义。
ETL
是一种数据处理方法,它由以下三个步骤组成:
- 抽取(Extraction):从多个数据源中提取数据。
- 转换(Transformation):对抽取的数据进行清洗、整合、转换和验证,以满足特定的需求。
- 加载(Loading):将经过转换的数据加载到目标系统或数据仓库中。
About EtlT
EtlT
它拆分了原有 ETL
和 ELT
的结构,并力求 实时
和 批量
统一在一起处理以满足实时数据仓库和 AI
应用的需求。
EtlT
由以下四个步骤组成:
- E(xtract) 抽取:从数据源角度来看,支持传统的线下数据库、传统文件、传统软件同时,还要支持新兴云上数据库、SaaS 软件 API 以及 Serverless 数据源的抽取;从数据抽取方式来看,需要支持实时 CDC(Change Data Capture)对数据库 Binlog 日志的解析,也要支持实时计算(例如 Kafka Streaming),同时也需要支持大批量数据读取(多线程分区读取、限流读取等)。
- t(ransform) 规范化:相对于 ETL 和 ELT,EtLT 多出了一个小 t,它的目标是数据规范化(Data Normalization)将复杂、异构的抽取出来数据源,快速地变为目标端可加载的结构化数据,同时,针对 CDC 实时加载 Binlog 进行拆分、过滤、字段格式变更,并支持批量和实时方式快速分发到最终 Load 阶段。
- L(oad) 加载:准确的说,加载阶段已经不是简单的数据加载,而是配合 Et 阶段,将数据源的数据结构的变更、数据内容的变更以适合数据目标端(Data Target)的形式快速、准确的加载到数据目标当中,其中,对于数据结构的变化要支持同源数据结构变更(Schema Evolution),数据加载也应该支持大批量加载(Bulk Load)、SaaS 加载(Reverse ETL)、JDBC 加载等。确保既支持实时数据和数据结构的变化,还要支持大批量数据快速加载。
- T(ransform) 转化:在云数据仓库、线下数据仓库或新数据联邦的环境下,完成业务逻辑的加工,通常使用 SQL 方式,实时或批量地将复杂业务逻辑准确、快速变为业务端或者 AI 端使用的数据。
在 EtLT
架构下,使用者人群也有了明确的分工:
- EtL 阶段:以数据工程师为主,他们将复杂异构的混合数据源,变为数据仓库或者数据联邦可加载的数据,放到数据存储当中,他们无需对企业指标计算规则有深入理解,但需要对各种源数据和非结构化数据变为结构化数据转化有深入理解。他们需要确保的是数据的及时性、数据源到结构化数据的准确性。
- T 阶段:以数据分析师、各业务部门数据
SQL
开发者、AI
工程师为主,他们深刻理解企业业务规则,可以将业务规则变为底层结构化数据上的SQL
语句进行分析统计,最终实现企业内部的数据分析和 AI 应用的实现,他们需要确保的是数据逻辑关系、数据质量以及最终数据结果满足业务需求。
通常情况下,在大数据的处理基本倾向于前面部分的 Etl 环节
,而后面的 T 环节
的数据处理倾向于 SQL
统计分析,也就是俗称的 最后一公里
。
谈谈 ETL 作用
ETL 对企业的作用
-
数据整合:企业通常有多个数据源,包括数据库、文件、应用程序等,
ETL
能够将这些分散的数据整合在一起,为企业提供全面且一致的数据视图。 -
数据清洗与质量控制:
ETL
可以清洗和验证数据,排除重复、不完整或不准确的数据,提高数据的质量和可靠性。 -
决策支持:通过将多个数据源中的数据整合起来,并进行转换和分析,
ETL
可以为企业提供准确的决策支持信息,帮助管理层做出更明智的决策。 -
业务流程优化:
ETL
可以将数据从不同系统中抽取出来,并进行转换和加载,实现数据在不同系统之间的流动,优化业务流程,提高企业的效率和竞争力。
ETL 对个人职业发展的作用
-
数据处理和分析能力:掌握
ETL
技术可以使个人具备处理和分析大规模数据的能力。在当今数据驱动的时代,数据处理和分析已成为许多职业领域的核心需求,如数据科学家、业务分析师、市场营销人员等。ETL的知识和技能使个人能够有效地抽取、转换和加载数据,为数据分析和洞察提供基础。 -
数据整合和管理能力:
ETL
能够将分散的数据源整合为一致的数据视图。掌握ETL技术的个人可以有效管理和整合不同来源的数据,提供准确和可靠的数据资源。这对于数据管理岗位、数据架构师、数据工程师等职业来说尤为重要。 -
决策支持和业务优化能力:
ETL
可以为企业提供准确的决策支持信息,并帮助优化业务流程。具备ETL技能的个人可以参与数据驱动的决策制定过程,通过数据抽取、转换和加载,为管理层提供准确的数据报告和分析结果,促进企业的业务发展和效率提升。 -
跨行业换工作的机会:
ETL
技能是一种跨行业的技能,可以应用于各种行业和领域。掌握ETL技术可以使个人具备在不同行业和领域中进行数据处理和分析的能力,扩展个人的职业发展机会。无论是金融、健康、零售还是制造业等,ETL
技能都是广泛需求的,因此具备这一技能可以为个人提供更多的就业和发展选择。
ETL.NET 介绍
.neter 人员大数据处理框架终于来了。
ETL.NET
是一个完全用 .NET
编写的开源框架,可用于多平台使用,并可直接集成到任何 .NET
应用程序中。可以毫不费力地实现快速的、低内存的和易于维护的数据处理,即使是百万数据也能轻松应对。所有用于规范化、更新插入、查找或连接的工具都大大减少了任何导入和转换目的的工作量。处理跟踪的所有内容,错误跟踪都是为开发人员自动完成的。
ETL.NET 功能特点
ETL.NET
是一组 .NET
库,允许将常规商业智能 ETL
功能嵌入到任何 .NET
应用程序中。
- .NET 支持:
ETL.NET
完全用.NET
编写,用于多平台使用,并可直接集成到任何.NET
应用程序中。 - 易于实施:
ETL.NET
的工作原理与SSIS
类似,ETL
进程要像Linq
查询一样用.NET
编写。 - 易于运行:简单明了的
.NET
,ELT.NET
运行时无需安装即可执行ETL
进程。
缺少什么?没有问题!
任何类型的扩展
都可以在瞬间实现,以创建新类型的数据源/目标或任何类型的运算符。ETL.NET
就是为此而设计的。
1、它包含 SSIS 的所有转换和功能
ETL.NET
提供了对任何数据源进行任何转换所需的每个运算符。ETL.NET
实现的运算符的灵感来自SQL
提供的运算符:Map、Join、Sort、Distinct、Lookup、Top、Pivot、Cross Apply、Union、Group By、Aggregate
等;- 数据源,如
Excel
,平面文件,如:csv、SQL Server、Xml、Entity Framework(实体框架)
等; - 通过自动过滤和保存功能跟踪整个活动;
Process
流程的参数化;
2、开箱即用的功能
ETL.NET
支持从多种数据类型和数据源中读取可写入数据,可以满足各种使用场景。
2.1、Read or write any file type and any data source.(读取或写入任何文件类型和任何数据源)
Native SQL server
Entity Framework
CSV
Excel
Bloomberg response files
Searchable PDF
XML
Anything .NET can read or write whatsoever
(任何.NET
可以读取或者写入的内容)
2.2、Read or write files on any source.(在任何源上读取或写入文件)
File system
FTP
SFTP
FTPS
Dropbox
eMail and MailBox
zip archives
Anything .NET can access whatsoever
(任何.NET
可以访问的内容)
如何使用 ETL.NET ?
ETL.NET
是一系列的类库,可以方便的通过 Nuget
包安装集成到任何 .NET
应用程序即可使用。
ETL.NET 相关资源
Github
项目地址,https://github.com/paillave/Etl.NetETL.NET
官网,https://paillave.github.io/Etl.Net/Nuget
包资源,https://www.nuget.org/packages/Paillave.EtlNet.Core
Paillave.EtlNet 系列 Nuget 包
Examples 应用举例
实验目标:提取特定格式
.zip
文件中所有的.csv
文件数据,并处理数据写入到指定数据库表中。
1、创建控制台项目
- 执行
.net cli
命令,创建ConsoleAppEtl
项目:
dotnet new console -o ConsoleAppEtl --no-https -f net8.0
- 查看创建控制台(
console
)项目更多帮助信息:
PS C:\Users\Jeffrey.Chai> dotnet new console -h
控制台应用 (C#)
作者: Microsoft
描述: 用于创建可在 Windows、Linux 和 macOS 上 .NET 上运行的命令行应用程序的项目用法:dotnet new console [options] [模板选项]选项:-n, --name <name> 正在创建的输出名称。如未指定名称,则使用输出目录的名称。-o, --output <output> 要放置生成的输出的位置。--dry-run 如果运行给定命令行将导致模板创建,则显示将发生情况的摘要。--force 强制生成内容 (即使它会更改现有文件)。--no-update-check 在实例化模板时,禁用对模板包更新的检查。--project <project> 应用于上下文评估的项目。-lang, --language <C#> 指定要实例化的模板语言。--type <project> 指定要实例化的模板类型。模板选项:-f, --framework <net6.0|net7.0|net8.0> 项目的目标框架。类型: choicenet8.0 目标 net8.0net7.0 目标 net7.0net6.0 目标 net6.0默认: net8.0--langVersion <langVersion> 在创建的项目文件中设置 LangVersion 属性类型: text--no-restore 如果指定,则在创建时跳过项目的自动还原。类型: bool默认: false--use-program-main 是否生成显式程序类和主方法,而不是顶级语句。类型: bool默认: false--aot 是否启用将项目以 native AOT 发布。类型: bool默认: false要查看有关其他模板语言(F#, VB)的帮助,请使用 --language 选项:dotnet new console -h --language F#
2、添加依赖 nuget 包
该项目中使用到的 nuget
包信息如下:
<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net8.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><PackageReference Include="Paillave.EtlNet.Core" Version="2.0.47" /><PackageReference Include="Paillave.EtlNet.FileSystem" Version="2.0.47" /><PackageReference Include="Paillave.EtlNet.SqlServer" Version="2.0.47" /><PackageReference Include="Paillave.EtlNet.TextFile" Version="2.0.47" /><PackageReference Include="Paillave.EtlNet.Zip" Version="2.0.47" /></ItemGroup><ItemGroup><Folder Include="Files\Input\" /></ItemGroup></Project>
3、应用案例代码编写
在编写 demo
代码之前,先在项目中添加一个文件夹:
Files\Input\
,用来模拟输入的数据源;
下面是一个 ETL.NET
处理数据的场景:
- 首先从文件夹中读取所有的
zip
文件。 - 解压
zip
,并读取其中的.csv
文件。 - 解析
.csv
文件内容,添加的Person
的集合中。 - 去除重复项(以
email
作为唯一依据),并写入到Sql Server
数据库中。
依据上面的处理流程,在 Program.cs
中编写如下代码:
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using Paillave.Etl.Core;namespace ConsoleAppEtl;internal class Program
{static async Task Main(string[] args){Console.WriteLine("Hello, ETL.NET! https://paillave.github.io/");var processRunner = StreamProcessRunner.Create<string>(DefineProcess);processRunner.DebugNodeStream += (sender, e) => {/* PLACE A CONDITIONAL BREAKPOINT HERE FOR DEBUG ex: e.NodeName == "parse file" */Console.WriteLine($"NodeName = {e.NodeName},Count = {e.Count},HasError = {e.HasError},ToSequenceId = {e.ToSequenceId},FromSequenceId = {e.FromSequenceId}");};string inputFilesPath = @"C:\Users\Jeffrey.Chai\Desktop\test\ConsoleAppEtl\Files\Input";string connStr = "Data Source=.;User Id=sa;Password=123@etl.net;Initial Catalog=EtlTest;Encrypt=True;TrustServerCertificate=True;Pooling=true;Min Pool Size=1;Max Pool Size=10;";using (var cnx = new SqlConnection(connStr)){cnx.Open();var executionOptions = new ExecutionOptions<string>{Resolver = new SimpleDependencyResolver().Register(cnx)};var res = await processRunner.ExecuteAsync(config: inputFilesPath, options: executionOptions);if (res.Failed && res.ErrorTraceEvent != null){Console.WriteLine($"errors:{res.ErrorTraceEvent.Content.Type},{res.ErrorTraceEvent.Content.Level},{res.ErrorTraceEvent.Content.Message}");}Console.WriteLine(res.Failed ? "Failed" : "Succeeded");}}/// <summary>/// 定义处理流程/// </summary>/// <param name="contextStream"></param>private static void DefineProcess(ISingleStream<string> contextStream){contextStream.CrossApplyFolderFiles(name: "列出所有的 .zip 文件", pattern: "*.zip", recursive: true).CrossApplyZipFiles(name: "从 .zip 解压出 .csv 文件", pattern: "*.csv").CrossApplyTextFile(name: "解析 .csv 文件",args: FlatFileDefinition.Create(item => new Person{Email = item.ToColumn("email"),FirstName = item.ToColumn("first name"),LastName = item.ToColumn("last name"),DateOfBirth = item.ToDateColumn("date of birth", "yyyy-MM-dd"),Reputation = item.ToNumberColumn<int?>("reputation", ".")}).IsColumnSeparated(',')).Distinct("email 去重", item => item.Email).SqlServerSave("写入 mssql2022 数据库", o => o.ToTable("dbo.Person").SeekOn(p => p.Email).DoNotSave(p => p.Id)).Do("输出到控制台", item => Console.WriteLine($"fullname:{item.FirstName}-{item.LastName},email:{item.Email}"));}/// <summary>/// 数据库表实体模型/// </summary>private class Person{public int Id { get; set; }public string Email { get; set; }public string FirstName { get; set; }public string LastName { get; set; }public DateTime DateOfBirth { get; set; }public int? Reputation { get; set; }}
}
4、准备数据库表结构
4.1、创建数据库
创建数据库命名为 EtlTest
,此处我就使用 docker
容器化部署一个 mssql2022
,执行如下命令:
# 搜索镜像
docker search mssql
# 拉取镜像
docker pull mcr.microsoft.com/mssql/server:2022-latest# 运行容器 mssql2022
docker run -d --name mssql2022 --hostname mssql2022 \
-p 1433:1433 \
-e "ACCEPT_EULA=Y" \
-e "MSSQL_SA_PASSWORD=123@etl.net" \
-e "TZ=Asia/Shanghai" \
-e "MSSQL_PID=Developer" \
-e "MSSQL_COLLATION=Chinese_PRC_BIN" \
mcr.microsoft.com/mssql/server:2022-latest
4.2、测试数据库连接
数据库容器运行成功后,使用数据库工具 dbeaver-ce
连接测试,看数据库是否能够正常访问,如果正常访问即显示如下信息:
4.3、创建数据库表
依据上面的 Person.cs
实体模型,创建数据表 Person
,执行如下 sql
脚本:
-- mssql
CREATE TABLE "Person" ( "Id" INT identity(1,1) PRIMARY KEY NOT NULL, "Email" VARCHAR(32) NOT NULL, "FirstName" VARCHAR(32) NOT NULL, "LastName" VARCHAR(32) NOT NULL, "DateOfBirth" DateTime NOT NULL,"Reputation" INT NULL
);
此时该表的数据为空,改表主要用于从特定文件 .csv
提取数据,并写入表中保存。
5、准备数据源
这里我们准备的源数据是后缀为 .csv
类型的文件,并且压缩为 .zip
格式的文件夹。
Person.zip
文件存放入项目中 Files\Input
文件夹里面,该压缩文件中,存放了 3
个 .csv
的文件,用于模拟分批到处数据的情况。下面 3
个文件中,分别在每个文件中放入 10
条数据。
.csv
文件的内容格式类似如下:
"id","email","first name","last name","date of birth","reputation"
1,"123@qq.com",hu,pingan,"2023-12-12",10
2,"321@qq.com",ma,liuliu,"2023-11-12",5
3,"132@qq.com",zhan,xiaosan,"2023-12-15",2
验证测试
说明:
Person.zip
文件可以存放多个结构相同的.csv
文件,这里为了方便测试,模拟3
个文件即可。
经过上面的环节,我们已经准备好了项目测试的基础条件。接下来我们就运行项目,启动看下,能否把 Person.zip
文件中的 3
个 .csv
文件给解析出来,并提取到里面的数据写入到提前准备好的数据库表中。
执行完成,控制台输出信息如下:
Hello, ETL.NET! https://paillave.github.io/
fullname:项-栋,email:H6zQXwZ@gmail.com
fullname:谈-震,email:YbjDWRYLT@gmail.com
fullname:滑-超浩,email:NOtywnmL@gmail.com
fullname:慕容-伦,email:iqGQtl8Vf@gmail.com
fullname:司空-群,email:MvDxeaOs@gmail.com
fullname:壤驷-泰,email:OPcQBLZ@gmail.com
fullname:国-才,email:bMIhd6K@gmail.com
fullname:尤-建,email:OCrdarkYy@gmail.com
fullname:公西-毅,email:pYhCZZQ@gmail.com
fullname:邢-飞,email:8FDeFQ@gmail.com
fullname:昝-龙,email:Ybr1jWLv@gmail.com
fullname:麻-军,email:6hPwNdi@gmail.com
fullname:崔-朗,email:gdg2ghvZC@gmail.com
fullname:籍-清,email:y2fSVPoJ@gmail.com
fullname:缑-克,email:SOJ72Ih@gmail.com
fullname:赫连-广,email:WC4NLHXr@gmail.com
fullname:印-信,email:BsGL3fcV@gmail.com
fullname:扈-强,email:BO3P744@gmail.com
fullname:漆雕-波,email:LZA31yMtR@gmail.com
fullname:堵-山,email:kRIjZjmmA@gmail.com
fullname:左-民,email:mg02bO@gmail.com
fullname:乔-言若,email:tp4Nqq@gmail.com
fullname:郁-江,email:LwyEXTrAV@gmail.com
fullname:翟-新利,email:dgWodVMqn@gmail.com
fullname:褚-才,email:AFx57A8@gmail.com
fullname:郁-奇,email:KitTNOc5@gmail.com
fullname:寇-平,email:bwtG8Ipfr@gmail.com
fullname:须-厚,email:DLgJR4s@gmail.com
fullname:宓-奇,email:2wFB27f@gmail.com
fullname:窦-超浩,email:jbbmcenx@gmail.com
NodeName = 列出所有的 .zip 文件,Count = 1,HasError = False,ToSequenceId = 1,FromSequenceId = 1
NodeName = 从 .zip 解压出 .csv 文件,Count = 3,HasError = False,ToSequenceId = 5,FromSequenceId = 3
NodeName = 解析 .csv 文件,Count = 30,HasError = False,ToSequenceId = 123,FromSequenceId = 7
NodeName = email 去重,Count = 30,HasError = False,ToSequenceId = 124,FromSequenceId = 8
NodeName = 写入 mssql2022 数据库,Count = 30,HasError = False,ToSequenceId = 125,FromSequenceId = 9
NodeName = 输出到控制台,Count = 30,HasError = False,ToSequenceId = 126,FromSequenceId = 10
Succeeded
此时我再次查看数据库表信息,是否有把 .csv
文件的数据提取处理保存到指定的数据库表中。
SELECT Id, Email, FirstName, LastName, DateOfBirth, Reputation
FROM EtlTest.dbo.Person;
数据库表 Person
查询信息显示如下:
结论: Person.zip
文件中的 3
个 .csv
文件合计数据量 30
行,存入数据库表 EtlTest.dbo.Person
的数据符合预期。