目录
1、功能说明
2、API使用说明
3、代码示例
1、功能说明
从Flink1.1开始提供了DataGen连接器,它提供了Source类的实现(可并行的源算子),用来生成测试数据,在本地开发或者无法访问外部系统(如kafka)时,它就会非常有用。
DataGen连接器是内置的,不需要额外的依赖项。
2、API使用说明
方法定义:
public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows)参数说明:DataGenerator<T> generator : 指定数据生成器对象long rowsPerSecond : 指定数据发射速率(每秒发射的记录数),默认值为Long.MAX_VALUE@Nullable Long numberOfRows : 指定指定输出数据的总行数(为null时,表示一直输出)关于DataGenerator类
public interface DataGenerator<T> extends Serializable, Iterator<T>功能说明:继承了Iterator,利用迭代器来构造测试数据
3、代码示例
Flink版本说明:flink_1.13.0、scala_2.12
定义User类:
package com.baidu.beancase class User(id: Long, name: String)
测试代码:
test("DataGen 连接器") {// 1. 获取流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)// 自定义 DataGenerator实现类(用来随机生成User对象)val userGenerator = new DataGenerator[User]() {// 定义随机数数据生成器var generator: RandomDataGenerator = _// 初始化数据生成器override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {generator = new RandomDataGenerator}// 判断迭代器是否有值override def hasNext: Boolean = true// 生成随机字符串,并返回override def next(): User = {User(generator.nextLong(1, 99) // 生成1~99区间的随机整数, generator.nextHexString(4) // 生成4位字符串)}}// 自定义字符串数据生成器val stringGenerator = new DataGenerator[String]() {// 定义随机数数据生成器var generator: RandomDataGenerator = _// 初始化数据生成器override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {generator = new RandomDataGenerator}// 是否有下一个值override def hasNext: Boolean = true// 生成随机字符串,并返回override def next(): String = generator.nextHexString(3)}val dataGenSource = new DataGeneratorSource(userGenerator // 指定数据生成器, 2L // 指定发射速率(每秒发射的记录数), null // 指定输出数据的总行数(为null时,表示一直输出))// 将DataGeneratorSource做为数据源val ds = env.addSource(dataGenSource)println(s"并行度: ${ds.parallelism}")// 打印DataStreamds.print()// 出发程序执行env.execute()}
执行结果: