flink1.17 eventWindow不要配置processTrigger

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.

flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃

bug复现操作

idea运行代码后 往source kafka发送一条数据  

a,1,1690304400000

可以看到无限输出:

理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用

问题复现代码

package com.yy.flinkWindowAndTriggerimport com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Secondsobject flinkEventWindowAndProcessTriggerBUGLearn {def main(args: Array[String]): Unit = {// flink 启动本地webuival conf = new Configurationconf.setInteger(RestOptions.PORT, 28080)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)//    val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.configure(conf)/*kafka输入:a,1,1690304400000        //对应 2023-07-26 01:00:00 (无限输出)       //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)a,1,7200000               // 1970-01-1 10:00:00*/val brokers = "172.18.105.147:9092"val source = KafkaSource.builder[String].setBootstrapServers(brokers).setTopics("t1").setGroupId("my-group-23asdf46").setStartingOffsets(OffsetsInitializer.latest())// .setDeserializer() // 参数: KafkaRecordDeserializationSchema.setDeserializer(new M1()).build()val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")val s1 = ds1.map(_.split(",")).map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳.assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0))).keyBy(_.f1).window(TumblingEventTimeWindows.of(seconds(10))).trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L))).reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))s1.print()env.execute("KafkaNewSourceAPi")}// 乱序流class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {override def extractTimestamp(element: C1): Long = {element.f3}}// key num timestampcase class C1(f1: String, f2: Int, f3: Long)
}

-

-

maven pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkLocalDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FlinkLocalDemo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.8</scala.version></properties><dependencies><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.12.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.33</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
<!--            <version>1.2.17</version>--></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 引入flink1.13.0 scala2.12.12   --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- Either... --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- or... --><!--        下面几个是代码中写sql需要的包 四个中一个都不能少 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version>
<!--            <scope>test</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存--><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--            <scope>provided</scope>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency></dependencies><build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution></executions><configuration><scalaVersion>${scala.version}</scalaVersion></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.5</version><configuration><!--这部分可有可无,加上的话则直接生成可运行jar包--><!--<archive>--><!--<manifest>--><!--<mainClass>${exec.mainClass}</mainClass>--><!--</manifest>--><!--</archive>--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/52991.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python的下载和安装步骤,python下载安装教程3.10.0

大家好&#xff0c;给大家分享一下python下载安装教程3.10.0&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 第一步&#xff1a;下载Python安装包 在Python的官网 www.python.org 中找到最新版本的Python安装包&#xff0c;点击进行下载&a…

探究Spring事务:了解失效场景及应对策略

在现代软件开发中&#xff0c;数据的一致性和完整性是至关重要的。为了保证这些特性&#xff0c;Spring框架提供了强大的事务管理机制&#xff0c;让开发者能够更加自信地处理数据库操作。然而&#xff0c;事务并非银弹&#xff0c;存在一些失效的情景&#xff0c;本文将带您深…

AtcoderABC227场

A - Last CardA - Last Card 题目大意 一共 K 张卡片分发给 N 个人&#xff0c;这些人的编号为 1, 2, …, N 从第 A 个人开始&#xff0c;按照顺序依次将卡片发给以下人员&#xff1a;A, A1, A2, …, N, 1, 2, …问最后一个卡片将发给哪个人&#xff1f; 具体来说&#xff0c;…

2023年C++面试宝典

目录 第一章&#xff1a;C基础知识1.1 C语言起源与发展1.2 C的重要特点和优点1.3 C的数据类型和变量1.4 函数和命名空间1.5 运算符和表达式 第二章&#xff1a;面向对象编程2.1 类与对象的概念2.2 封装、继承和多态2.3 构造函数和析构函数2.4 静态成员和常量成员2.5 虚函数和纯…

34 从磁盘读取数据到内存的调试

前言 我们大多数的文件是存储在磁盘上面的 然后 我们通过 open read/write 相关 api 是控制的是 磁盘 和 内存 之间的数据交互 磁盘 到 内存, 或者 内存 到 磁盘 我们这里 来大致看一下 磁盘到 内存的这一个过程 调试读取磁盘数据到 page 的流程 这里的流程主要是包含…

【FAQ】EasyGBS平台通道显示在线,视频无法播放并报错400的排查

EasyGBS是基于国标GB28181协议的视频云服务平台&#xff0c;它可以支持国标协议的设备接入&#xff0c;在视频能力上能实现直播、录像存储、检索与回放、云台控制、告警上报、语音对讲、平台级联等功能&#xff0c;既能作为业务平台使用&#xff0c;也能作为能力层平台调用。 我…

装饰器模式(Decorator)

装饰器模式是一种结构型设计模式&#xff0c;用来动态地给一个对象增加一些额外的职责。就增加对象功能来说&#xff0c;装饰器模式比生成子类实现更为灵活。装饰器模式的别名为包装器(Wrapper)&#xff0c;与适配器模式的别名相同&#xff0c;但它们适用于不同的场合。 Decor…

【每天40分钟,我们一起用50天刷完 (剑指Offer)】第四十七天 47/50

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…

VGG卷积神经网络-笔记

VGG卷积神经网络-笔记 VGG是当前最流行的CNN模型之一&#xff0c; 2014年由Simonyan和Zisserman提出&#xff0c; 其命名来源于论文作者所在的实验室Visual Geometry Group。 测试结果为&#xff1a; 通过运行结果可以发现&#xff0c;在眼疾筛查数据集iChallenge-PM上使用VGG…

实例031 窗体中的滚动字幕

实例说明 普通窗体中的文字位置都是固定的&#xff0c;一些窗体中需要让文字动起来&#xff0c;例如一些广告性较强的界面中需要做一些滚动的字幕。本例实现了一个具有滚动字幕效果的窗体&#xff0c;运行本例&#xff0c;单击【演示】按钮&#xff0c;看到窗口中的文字开始滚…

java讲解Spring Boot配置文件级别 相互覆盖关系 解决一方不愿意给数据库密码 一方不愿意给源码时 数据库配置问题

前面 我们讲过Spring Boot 修改临时变量的方式 但另一个场景 就是 我们 在本地开发环境 用的是一个配置 但如果项目经理上线 他想改这些配置 怎么弄呢 特别是数据库之类的配置 很多线上是不太一样的 那么 我们先看一个比较基本的方法 在配置文件的同目录下创建一个目录 叫 con…

C语言笔记6

关于microsoft visual 的学习笔记 CtrlF5就是启动编译程序 先CtrlA进行全选&#xff0c;然后AitF8就自动的调节代码的格式 #include <stdio.h> #include <stdlib.h> int main() {//system启动程序(在一个程序中启动另外一个程序)//如果程序环境变量中找不到程序&am…