Flink 源算子之 DataGeneratorSource DataGenerator

目录

1、功能说明

2、API使用说明

3、代码示例


1、功能说明

        从Flink1.1开始提供了DataGen连接器,它提供了Source类的实现(可并行的源算子),用来生成测试数据,在本地开发或者无法访问外部系统(如kafka)时,它就会非常有用。

        DataGen连接器是内置的,不需要额外的依赖项。


2、API使用说明

方法定义:
public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows)参数说明:DataGenerator<T> generator     :  指定数据生成器对象long rowsPerSecond             :  指定数据发射速率(每秒发射的记录数),默认值为Long.MAX_VALUE@Nullable Long numberOfRows    :  指定指定输出数据的总行数(为null时,表示一直输出)关于DataGenerator类
public interface DataGenerator<T> extends Serializable, Iterator<T>功能说明:继承了Iterator,利用迭代器来构造测试数据 

3、代码示例

Flink版本说明:flink_1.13.0、scala_2.12

定义User类:

package com.baidu.beancase class User(id: Long, name: String)

测试代码:

  test("DataGen 连接器") {// 1. 获取流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)// 自定义 DataGenerator实现类(用来随机生成User对象)val userGenerator = new DataGenerator[User]() {// 定义随机数数据生成器var generator: RandomDataGenerator = _// 初始化数据生成器override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {generator = new RandomDataGenerator}// 判断迭代器是否有值override def hasNext: Boolean = true// 生成随机字符串,并返回override def next(): User = {User(generator.nextLong(1, 99) // 生成1~99区间的随机整数, generator.nextHexString(4) // 生成4位字符串)}}// 自定义字符串数据生成器val stringGenerator = new DataGenerator[String]() {// 定义随机数数据生成器var generator: RandomDataGenerator = _// 初始化数据生成器override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {generator = new RandomDataGenerator}// 是否有下一个值override def hasNext: Boolean = true// 生成随机字符串,并返回override def next(): String = generator.nextHexString(3)}val dataGenSource = new DataGeneratorSource(userGenerator // 指定数据生成器, 2L // 指定发射速率(每秒发射的记录数), null // 指定输出数据的总行数(为null时,表示一直输出))// 将DataGeneratorSource做为数据源val ds = env.addSource(dataGenSource)println(s"并行度: ${ds.parallelism}")// 打印DataStreamds.print()// 出发程序执行env.execute()}

执行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/194.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql学习

docker创建mysql version: "2" services:mysql:container_name: mysql_simpleimage: mysql:5.7#env_file:#- ../env/mysql.envenvironment:MYSQL_ROOT_PASSWORD: "123456"MYSQL_USER: zhaoMYSQL_PASS: zhaovolumes:# 如果没有data文件夹会自动创建- ./dat…

Docker介绍

目录​​​​​​​ 一、Docker是什么&#xff1f; 二、Docker只能使用Linux吗&#xff1f; 三、为什么要使用Docker&#xff1f; 四、镜像和容器 五、关闭SELINUX 一、Docker是什么&#xff1f; Docker是轻量级的虚拟机产品&#xff0c;我们在使用Docker的时候&#xff…

05 proxy代理、组件间的通信

React全家桶 一、脚手架配置代理(proxy)的方式 CORS: 请求url:http://www.baidu.com 发送url:http://www.jd.com response.setHeader(Access-Control-Allow-Origin,*);通过express快速搭建一个服务 创建一个图书组件 import React, { useEffect } from react import axio…

使用OpenXML库替换docx文档(Word文档)中的特定字段

在批量生成Word文档的应用中&#xff0c;最常见的需求莫过于替换掉文档中的特定字段以生成新的文档。利用OpenXML库可轻松实现这一需求。 不完善版本 首先放出最简单然而有bug的版本&#xff1a; using DocumentFormat.OpenXml.Packaging; using DocumentFormat.OpenXml.Wor…

Spring Boot 中的事务回滚规则

Spring Boot 中的事务回滚规则 在应用程序中&#xff0c;事务管理是一个重要的概念。事务是指一系列的操作&#xff0c;这些操作要么全部成功&#xff0c;要么全部失败。在Spring Boot中&#xff0c;我们可以使用事务管理器来管理事务。在使用事务管理器的时候&#xff0c;一个…

【Oracle】springboot连接Oracle 集成mybatis、druid

目录 项目结构与库表数据pom.xmlapplication.yml实体类Mappercontroller接口测试 基于spring-boot 2.7.11&#xff0c;连接Oracle 11g 仅做一个简单的示例 特别说明&#xff08;不一定正确&#xff0c;还请指正&#xff09;&#xff1a;我Oracle也不熟&#xff0c;但据我观察发…

Grafana任意文件读取漏洞(CVE-2021-43798)

Grafana任意文件读取漏洞&#xff08;CVE-2021-43798&#xff09; 一、漏洞描述 Grafana是一个跨平台、开源的数据可视化网络应用程序平台。用户配置连接的数据源之后&#xff0c;Grafana可以在网络浏览器里显示数据图表和警告。 二、漏洞影响范围 影响版本&#xff1a; Gr…

【资料分享】全志科技T507-H评估板规格书(4核ARM Cortex-A53,主频1.416GHz)

1 评估板简介 创龙科技TLT507-EVM是一款基于全志科技T507-H处理器设计的4核ARM Cortex-A53国产工业评估板&#xff0c;主频高达1.416GHz&#xff0c;由核心板和评估底板组成。核心板CPU、ROM、RAM、电源、晶振等所有器件均采用国产工业级方案&#xff0c;国产化率100%。同时&a…

ElasticSearch 8.0+ 版本Windows系统启动

下载地址&#xff1a;https://www.elastic.co/cn/downloads/past-releases/winlogbeat-8-8-1 解压\elasticsearch\elasticsearch-8.5.1 进入bin目录&#xff0c;启动elasticsearch.bat 问题1&#xff1a; warning: ignoring JAVA_HOMED:\jdk1.8.0_271; using bundled JDK J…

FlutterUnit 已上架 iOS,暗色模式全面支持

theme: cyanosis 一、FlutterUnit 的全平台支持 FlutterUnit 是我的一个开源项目&#xff0c;基于 Flutter 构建的一个 全平台 应用程序。现在很荣幸地宣布: FlutterUnit 已经上架 iOS 的 App Store &#xff0c;自此主流的几大平台均已提供体验。 项目地址: https://github.co…

普通人如何居家办公实现网上赚钱?分享五种互联网赚钱的副业项目

科思创业汇 大家好&#xff0c;这里是科思创业汇&#xff0c;一个轻资产创业孵化平台。赚钱的方式有很多种&#xff0c;我希望在科思创业汇能够给你带来最快乐的那一种&#xff01; 网上赚钱&#xff0c;主要是利用各种信息差异从网上获取收入。 近年来&#xff0c;随着互联网…

分布式存储Ceph的部署及应用(创建MDS、RBD、RGW 接口)

系列文章目录 文章目录 系列文章目录一、1.存储基础2. 单机存储的问题3. 分布式存储&#xff08;软件定义的存储 SDS&#xff09; 二 Ceph1.Ceph 简介2. Ceph 数据的存储过程 总结 一、 1.存储基础 1.1 单机存储设备 ●DAS&#xff08;直接附加存储&#xff0c;是直接接到计算…