在JDK17尝鲜Flink1.17

在JDK17尝鲜Flink1.17

前言

还没玩明白老版本,Flink1.17就来了!!!总还是要向前看的。。。

根据官网文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/local_installation/

在这里插入图片描述

Flink runs on all UNIX-like environments, i.e. Linux, Mac OS X, and Cygwin (for Windows). You need to have Java 11 installed

所以JDK1.8肯定是不能再去用了。。。Flink早在1.15就要求使用JDK11,主要是为了用上比G1更优秀的ZGC,毕竟吞吐量下降15%只要多+20%的机器就可以弥补,有限Money能解决的问题并不是太大的问题,但是老一些的GC万一STW来个几秒钟,那Flink所谓的亚秒级实时响应就无从谈起了。ZGC保证了4TB内存时暂停时间控制在15ms以内,还是很适合Flink使用的。JDK15中ZGC达到了GA【使用–XX:+UseZGC开启】,目前Oracle主推的LTS在1.8、11后就是17了。。。所以JDK17才是未来。。。别人的Flink1.15在JDK17生产环境运行许久没出现过什么惊天地泣鬼神的大事故,稳定性应该还可以。

但是在Hive集群,为了兼容Hive,只能用JDK1.8:https://lizhiyong.blog.csdn.net/article/details/130799342

这种情况笔者已经踩过坑了,最好还是退而求其次使用老版本Flink。。。

JDK17部署

去Oracle官网:https://www.oracle.com/java/technologies/downloads/#jdk17-windows

在这里插入图片描述

Win10用户down这个就可以了。直接下一步,没啥讲究。

C:\Users\zhiyong>java -version
java version "17.0.7" 2023-04-18 LTS
Java(TM) SE Runtime Environment (build 17.0.7+8-LTS-224)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.7+8-LTS-224, mixed mode, sharing)C:\Users\zhiyong>

在Win+R,CMD出现这个就说明JDK17部署成功了。

Linux用户直接下载:

在这里插入图片描述

如果是Ubuntu,那更简单,不再赘述了。。。

Flink1.17的项目配置

按照官网描述:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/overview/

在这里插入图片描述

All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.

See FLIP-265 Deprecate and remove Scala API support

显然从Flink1.15开始的去Scala化进行的很彻底。。。未来就不能用Scala写Flink任务了。。。所以Scala用户尽早转型搞Spark。。。Flink欢迎的是Java用户。。。使用Scala开发的Flink任务也应该尽快改成Java版本,不然Flink2.0可能就彻底淘汰掉Scala了。。。

所以也就没有必要再去纠结Scala开发Flink任务的问题了。。。尽可能不要用了。。。

Flink的Pom依赖

那么Flink项目中主要的三大核心依赖就是:flink-streaming-java、flink-table-api-java、flink-table-api-java-bridge

各种连接器和其它用得着的依赖当然是用什么就添加什么。比如SQL Boy们一定要用的Table Planner,但是Javaer不一定用得上。减少不必要的依赖才能减少Fat的Jar包体积。

可以去Maven中央仓库查找:https://mvnrepository.com/

时过境迁,很多老版本的依赖早就不再更新了。比如古代笔者很喜欢的Flink写HDFS的依赖包:https://mvnrepository.com/artifact/org.apache.flink/flink-connector-filesystem

在这里插入图片描述

在Flink1.11以后【也就是2021年底】就没有任何更新了。所以还是要与时俱进!!!

版本

    <properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><scala.version>2.12</scala.version><scala.binary.version>2.12.12</scala.binary.version><flink.version>1.17.0</flink.version><encoding>UTF-8</encoding></properties>

虽然Scala要被Flink遗弃了,但是大数据基本环境还是少不了Scala的Runtime,先留着。

核心依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version><!--            <scope>test</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version><!--            <scope>test</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version>
<!--            <scope>test</scope>--></dependency></dependencies>

和之前Flink1.14相比:https://lizhiyong.blog.csdn.net/article/details/124161096

新版本Flink1.17的很多依赖没有Scala版本的后缀。

打包器

这年头,Flink任务一般是运行在Yarn或者K8S上。

在多租户资源隔离、多环境隔离、动态扩缩容等方面,Yarn是弟弟,不久的将来就会像被干掉的Mesos那样被后浪K8S拍死在沙滩上。。。

各种Saas服务的底层当然也还是K8S。在K8S环境下,直接打个Fat的Jar包,再去打镜像和部署,运维压力会小很多。

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 这里替换成要打Fat的Jar包的主类 --><mainClass>com.zhiyong.FlinkTest1</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>

主要是替换这里,修改Fat的Jar包的主类。Flink任务一般是按照Java SE的写法,一个Main方法算一个任务。用Spring的Web开发人员别纠结这个。。。

每次打包都需要手动修改一次。。。Emmn。

无状态应用

参考之前Flink1.14的这篇:https://lizhiyong.blog.csdn.net/article/details/123649447

从1.14到1.17的变化比起1.7到1.14还是要小一点。。。

简单写个无状态应用。先定义一个对象:

package com.zhiyong.common;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;/*** @program: zhiyong_flink17* @description: 车速日志对象* @author: zhiyong* @create: 2023-05-04 22:55**/@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class CarLog {private String carCode;   //车编号private String vin;     //车架号private int speed;      //车速:km/hprivate long logTime;   //数据记录时间:msprivate String gpsLongitude; //GPS经度private String gpsLatitude; //GPS维度}

简单起见,不开启Kafka了,直接写一个模拟数据源:

package com.zhiyong.common;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;/*** @program: zhiyong_flink17* @description: 每秒产生1条数据的车辆速度数据源* @author: zhiyong* @create: 2023-05-04 22:49**/
public class CarSpeedSource1ps implements SourceFunction<CarLog> {private boolean needRun = true;@Overridepublic void run(SourceContext sourceContext) throws Exception {Random random = new Random();CarLog carLog = new CarLog();carLog.setCarCode("car_" + random.nextInt(5));long logTime = 0;int speed = 0;while (needRun) {logTime = System.currentTimeMillis() - 50 - random.nextInt(500);speed=random.nextInt(150);carLog.setLogTime(logTime);carLog.setSpeed(speed);sourceContext.collect(carLog);Thread.sleep(1000);}}@Overridepublic void cancel() {needRun = false;}
}

简单展示一下:

package com.zhiyong;import com.zhiyong.common.CarLog;
import com.zhiyong.common.CarSpeedSource1ps;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @program: zhiyong_flink17* @description: Flink环境验证* @author: zhiyong* @create: 2023-05-04 20:53**/
public class FlinkTest1 {public static void main(String[] args) throws Exception {System.out.println("JDK17");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//防止报网络资源不充分的错CarSpeedSource1ps carSpeedSource1ps = new CarSpeedSource1ps();DataStreamSource<CarLog> source1 = env.addSource(carSpeedSource1ps);DataStreamSource<CarLog> source2 = env.addSource(carSpeedSource1ps);source1.print("source1=>>>");source2.print("source2=>>>");env.execute();}
}

此时直接报错。。。坑。。。

报错

JDK17
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @7e0e6aa2at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:970)at com.zhiyong.FlinkTest1.main(FlinkTest1.java:28)Process finished with exit code 1

显然是Java版本不同,反射出错。。。

这种情况需要在Idea中指定如下的VM Options:

--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.math=ALL-UNNAMED

在这里插入图片描述

此时即可正常运行:

JDK17
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
source2=>>>> CarLog(carCode=car_4, vin=null, speed=100, logTime=1683218141638, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=115, logTime=1683218141779, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=56, logTime=1683218142768, gpsLongitude=null, gpsLatitude=null)
source2=>>>> CarLog(carCode=car_4, vin=null, speed=56, logTime=1683218142874, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=113, logTime=1683218143666, gpsLongitude=null, gpsLatitude=null)
source2=>>>> CarLog(carCode=car_4, vin=null, speed=30, logTime=1683218143674, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=19, logTime=1683218144790, gpsLongitude=null, gpsLatitude=null)
source2=>>>> CarLog(carCode=car_4, vin=null, speed=101, logTime=1683218144653, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=118, logTime=1683218145619, gpsLongitude=null, gpsLatitude=null)
source2=>>>> CarLog(carCode=car_4, vin=null, speed=96, logTime=1683218146054, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=3, logTime=1683218146832, gpsLongitude=null, gpsLatitude=null)
source2=>>>> CarLog(carCode=car_4, vin=null, speed=57, logTime=1683218147022, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=119, logTime=1683218147948, gpsLongitude=null, gpsLatitude=null)
source2=>>>> CarLog(carCode=car_4, vin=null, speed=71, logTime=1683218147915, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_2, vin=null, speed=67, logTime=1683218148722, gpsLongitude=null, gpsLatitude=null)
source2=>>>> CarLog(carCode=car_4, vin=null, speed=20, logTime=1683218148742, gpsLongitude=null, gpsLatitude=null)Process finished with exit code 130

数据源Mock出了符合预期的数据,且可以通过print算子展示出来。

filterFunction

写一个简单的filter类检测超速:

package com.zhiyong.trans.filter;import com.zhiyong.common.CarLog;
import org.apache.flink.api.common.functions.FilterFunction;/*** @program: zhiyong_flink17* @description: 简单的车辆超速过滤* @author: zhiyong* @create: 2023-05-05 00:41**/
public class CarOverspeedFilter1 implements FilterFunction<CarLog> {@Overridepublic boolean filter(CarLog carLog) throws Exception {return carLog.getSpeed()>120?true:false;}
}

sinkFunction

写一个简单的sink类展示结果:

package com.zhiyong.sink;import com.zhiyong.common.CarLog;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.text.SimpleDateFormat;
import java.util.Date;/*** @program: zhiyong_flink17* @description: 简单展示的sink* @author: zhiyong* @create: 2023-05-05 00:46**/
public class CarOverspeedSink1 implements SinkFunction<CarLog> {//SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式@Overridepublic void invoke(CarLog value, Context context) throws Exception {System.out.println(value.getCarCode() + "于" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(value.getLogTime())) + " 超速。速度:" + value.getSpeed() + "km/h");}
}

查看结果

SingleOutputStreamOperator<CarLog> overspeedLog = source1.filter(new CarOverspeedFilter1());overspeedLog.addSink(new CarOverspeedSink1());

只需要这样调用:

source1=>>>> CarLog(carCode=car_1, vin=null, speed=26, logTime=1683219460556, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=62, logTime=1683219461684, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=33, logTime=1683219462664, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=140, logTime=1683219463688, gpsLongitude=null, gpsLatitude=null)
car_1于2023-05-05 00:57:43 超速。速度:140km/h
source1=>>>> CarLog(carCode=car_1, vin=null, speed=20, logTime=1683219464711, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=38, logTime=1683219465884, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=21, logTime=1683219466499, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=110, logTime=1683219467705, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=98, logTime=1683219468560, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=122, logTime=1683219469470, gpsLongitude=null, gpsLatitude=null)
car_1于2023-05-05 00:57:49 超速。速度:122km/h
source1=>>>> CarLog(carCode=car_1, vin=null, speed=123, logTime=1683219470752, gpsLongitude=null, gpsLatitude=null)
car_1于2023-05-05 00:57:50 超速。速度:123km/h
source1=>>>> CarLog(carCode=car_1, vin=null, speed=80, logTime=1683219471615, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=16, logTime=1683219472708, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=118, logTime=1683219473563, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=67, logTime=1683219474634, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=120, logTime=1683219475746, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=121, logTime=1683219476736, gpsLongitude=null, gpsLatitude=null)
car_1于2023-05-05 00:57:56 超速。速度:121km/h
source1=>>>> CarLog(carCode=car_1, vin=null, speed=70, logTime=1683219477705, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=106, logTime=1683219478665, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=105, logTime=1683219479805, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=46, logTime=1683219480845, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=60, logTime=1683219481482, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=35, logTime=1683219482773, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=5, logTime=1683219483457, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_1, vin=null, speed=128, logTime=1683219484734, gpsLongitude=null, gpsLatitude=null)
car_1于2023-05-05 00:58:04 超速。速度:128km/h
source1=>>>> CarLog(carCode=car_1, vin=null, speed=135, logTime=1683219485590, gpsLongitude=null, gpsLatitude=null)
car_1于2023-05-05 00:58:05 超速。速度:135km/h
source1=>>>> CarLog(carCode=car_1, vin=null, speed=38, logTime=1683219486750, gpsLongitude=null, gpsLatitude=null)Process finished with exit code 130

就可以看到结果。

使用Table API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);Table table1 = tableEnv.fromDataStream(source1);
Table table2 = table1.where($("speed").isGreater(120));
table2.printSchema();tableEnv.toDataStream(table2,CarLog.class).print("Table API运算后的超速数据:");

执行后:

(`carCode` STRING,`vin` STRING,`speed` INT NOT NULL,`logTime` BIGINT NOT NULL,`gpsLongitude` STRING,`gpsLatitude` STRING
)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=0, logTime=1683220824050, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=97, logTime=1683220824886, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=117, logTime=1683220825843, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=90, logTime=1683220827020, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=35, logTime=1683220828225, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=12, logTime=1683220828882, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=28, logTime=1683220830090, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=27, logTime=1683220831220, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=130, logTime=1683220832300, gpsLongitude=null, gpsLatitude=null)
Table API运算后的超速数据:> CarLog(carCode=car_3, vin=null, speed=130, logTime=1683220832300, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=49, logTime=1683220832950, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=64, logTime=1683220833906, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=105, logTime=1683220835212, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=33, logTime=1683220836274, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=110, logTime=1683220837162, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=44, logTime=1683220838144, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=49, logTime=1683220839290, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=148, logTime=1683220840343, gpsLongitude=null, gpsLatitude=null)
Table API运算后的超速数据:> CarLog(carCode=car_3, vin=null, speed=148, logTime=1683220840343, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=47, logTime=1683220841115, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=121, logTime=1683220842419, gpsLongitude=null, gpsLatitude=null)
Table API运算后的超速数据:> CarLog(carCode=car_3, vin=null, speed=121, logTime=1683220842419, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_3, vin=null, speed=147, logTime=1683220843040, gpsLongitude=null, gpsLatitude=null)
Table API运算后的超速数据:> CarLog(carCode=car_3, vin=null, speed=147, logTime=1683220843040, gpsLongitude=null, gpsLatitude=null)Process finished with exit code 130

显然这种情况下,也可以使用Table API做数据处理。

使用SQL

Table table3 = tableEnv.sqlQuery("select " +"concat_ws(''," +"cast(carCode as string)," +"cast('于' as string)," +"cast(to_timestamp(from_unixtime(logTime/1000,'yyyy-MM-dd HH:mm:ss')) as string)," +//大小写敏感,不能cast"cast('超速,速度:' as string)," +"cast(speed as string)," +"cast('km/h' as string)" +") as message " +"from " + table1 +" where speed>120"
);
table3.printSchema();
tableEnv.toDataStream(table3, String.class).print("Table Sql运算后的超速数据:");

执行后:

source1=>>>> CarLog(carCode=car_4, vin=null, speed=106, logTime=1683221978994, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_4, vin=null, speed=134, logTime=1683221979902, gpsLongitude=null, gpsLatitude=null)
Table Sql运算后的超速数据:> car_4于2023-05-05 01:39:39.000超速,速度:134km/h
source1=>>>> CarLog(carCode=car_4, vin=null, speed=142, logTime=1683221980929, gpsLongitude=null, gpsLatitude=null)
Table Sql运算后的超速数据:> car_4于2023-05-05 01:39:40.000超速,速度:142km/h
source1=>>>> CarLog(carCode=car_4, vin=null, speed=148, logTime=1683221981766, gpsLongitude=null, gpsLatitude=null)
Table Sql运算后的超速数据:> car_4于2023-05-05 01:39:41.000超速,速度:148km/h
source1=>>>> CarLog(carCode=car_4, vin=null, speed=148, logTime=1683221982592, gpsLongitude=null, gpsLatitude=null)
Table Sql运算后的超速数据:> car_4于2023-05-05 01:39:42.000超速,速度:148km/h
source1=>>>> CarLog(carCode=car_4, vin=null, speed=42, logTime=1683221983907, gpsLongitude=null, gpsLatitude=null)
source1=>>>> CarLog(carCode=car_4, vin=null, speed=110, logTime=1683221984943, gpsLongitude=null, gpsLatitude=null)Process finished with exit code 130

显然SQL也可以用于无状态应用。

遇到的报错

开发过程中,遇到了一些报错,例如:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 97 to line 1, column 103: Column 'logtime' not found in any table; did you mean 'logTime'?at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:187)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)at com.zhiyong.FlinkTest1.main(FlinkTest1.java:51)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 97 to line 1, column 103: Column 'logtime' not found in any table; did you mean 'logTime'?at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932)at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917)at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5163)at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:268)at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:6350)at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6510)at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6495)at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:324)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:954)at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6370)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:54)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:37)at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:954)at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6370)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:54)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:37)at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:954)at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6370)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:54)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:37)at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:954)at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6370)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:54)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:37)at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)at org.apache.calcite.sql.SqlAsOperator.acceptCall(SqlAsOperator.java:121)at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6370)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:54)at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:37)at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.go(SqlValidatorImpl.java:6339)at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5910)at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:426)at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4324)at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3570)at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1042)at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1017)at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:247)at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:992)at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:741)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:183)... 5 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'logtime' not found in any table; did you mean 'logTime'?at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:599)... 61 moreProcess finished with exit code 1

显然Flink对SQL的解析是运用了calcite,出现了大小写敏感的问题。。。在正常的SQL【Hive QL、Spark SQL、MySQL、Oracle】都不会有这种情况。。。一言难尽。。。不能直接cast时间戳也是个麻烦事,好在有函数可以套用。

有状态应用

Demo

package com.zhiyong;import com.zhiyong.common.CarLog;
import com.zhiyong.source.CarSpeedSource1ps;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @program: zhiyong_flink17_study* @description: Flink有状态应用验证* @author: zhiyong* @create: 2023-07-01 09:11**/
public class FlinkTest2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);CarSpeedSource1ps carSpeedSource1ps = new CarSpeedSource1ps();DataStreamSource<CarLog> source1 = env.addSource(carSpeedSource1ps);DataStreamSource<CarLog> source2 = env.addSource(carSpeedSource1ps);//        source1.print("source1=>>>");
//        source2.print("source2=>>>");DataStream<CarLog> source3 = source1.union(source2);source3.print("source3=>>>");SingleOutputStreamOperator<Object> source4 = source3.keyBy(new KeySelector<CarLog, Object>() {@Overridepublic Object getKey(CarLog carLog) throws Exception {return carLog.getCarCode();}}).flatMap(new RichFlatMapFunction<CarLog, Object>() {ValueState<Integer> valueState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor<Integer> overSpeedCount = new ValueStateDescriptor<Integer>("overSpeedCount",TypeInformation.of(new TypeHint<>() {@Overridepublic TypeInformation<Integer> getTypeInfo() {return super.getTypeInfo();}}));valueState = getRuntimeContext().getState(overSpeedCount);}@Overridepublic void flatMap(CarLog carLog, Collector<Object> collector) throws Exception {Integer value = valueState.value();if (null == value) {value = Integer.valueOf(0);}if (carLog.getSpeed() > 120) {value += 1;valueState.update(value);}collector.collect(Tuple2.of(carLog.getCarCode(), value));}});source4.print("source4=>>>");env.execute();}
}

也可以正常运行:

source3=>>>> CarLog(carCode=car_3, vin=null, speed=24, logTime=1688178031808, gpsLongitude=null, gpsLatitude=null)
source3=>>>> CarLog(carCode=car_2, vin=null, speed=10, logTime=1688178032050, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_2,0)
source4=>>>> (car_3,0)
source3=>>>> CarLog(carCode=car_2, vin=null, speed=5, logTime=1688178032826, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_3,0)
source4=>>>> (car_2,0)
source3=>>>> CarLog(carCode=car_3, vin=null, speed=93, logTime=1688178033145, gpsLongitude=null, gpsLatitude=null)
source3=>>>> CarLog(carCode=car_3, vin=null, speed=130, logTime=1688178034046, gpsLongitude=null, gpsLatitude=null)
source3=>>>> CarLog(carCode=car_2, vin=null, speed=8, logTime=1688178034202, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_2,0)
source4=>>>> (car_3,1)
source3=>>>> CarLog(carCode=car_2, vin=null, speed=142, logTime=1688178034806, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_3,1)
source4=>>>> (car_2,1)
source3=>>>> CarLog(carCode=car_3, vin=null, speed=113, logTime=1688178034939, gpsLongitude=null, gpsLatitude=null)
source3=>>>> CarLog(carCode=car_2, vin=null, speed=34, logTime=1688178036024, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_2,1)
source4=>>>> (car_3,2)
source3=>>>> CarLog(carCode=car_3, vin=null, speed=142, logTime=1688178035808, gpsLongitude=null, gpsLatitude=null)
source3=>>>> CarLog(carCode=car_2, vin=null, speed=115, logTime=1688178037145, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_3,3)
source4=>>>> (car_2,1)
source3=>>>> CarLog(carCode=car_3, vin=null, speed=129, logTime=1688178036853, gpsLongitude=null, gpsLatitude=null)
source3=>>>> CarLog(carCode=car_2, vin=null, speed=99, logTime=1688178037973, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_2,1)
source3=>>>> CarLog(carCode=car_3, vin=null, speed=12, logTime=1688178037941, gpsLongitude=null, gpsLatitude=null)
source4=>>>> (car_3,3)Process finished with exit code 130

类似这种状态编程,累加超速次数的应用也没啥问题。

至此,说明Flink在JDK17基本可以满足正常的使用。

更多的坑要慢慢踩了。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/131525151

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/9046.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cyclo-(D-Tyr-Gly),1217777-38-2,环-(L-甘氨酰酪氨酸),环二肽(CDPs)作为许多活性天然产物的骨架

Cyclo-(D-Tyr-Gly)中环二肽(CDPs)作为许多活性天然产物的骨架&#xff0c;由于其独特的生物和药理活性等引起了人们的广泛关注。作为环肽化合物&#xff0c;CDPs具有短肽分子良好的生物相容性、低免疫原性等优点。Cyclo-(D-Tyr-Gly)物理参数&#xff1a; CAS号&#xff1a;1217…

Linux高性能网络编程:TCP底层的收发过程

今天探索高性能网络编程&#xff0c;但是我觉得在谈系统API之前可以先讲一些Linux底层的收发包过程&#xff0c;如下这是一个简单的socket编程代码&#xff1a; int main() {... fd socket(AF_INET, SOCKET_STREAM, 0);bind(fd, ...);listen(fd, ...);// 如何建立连接...afd …

ChatGPT | Word文档如何更好地提取表格内容给ChatGPT

本文来自http://blog.csdn.net/hellogv/ &#xff0c;引用必须注明出处&#xff01; Word文档如何更好地提取表格内容给ChatGPT做知识库&#xff0c;这属于文本预处理工作。 本文只讲思路、测试结果&#xff0c;技术实现用Python和Java都能完成&#xff0c;下一篇文章再贴源码…

学会 IDEA 远程 Debug ,直接线上秀操作

有时候我们需要进行远程的debug&#xff0c;本文研究如何进行远程debug&#xff0c;以及使用 IDEA 远程debug的过程中的细节。看完可以解决你的一些疑惑。 配置 远程debug的服务&#xff0c;以springboot微服务为例。 首先&#xff0c;启动springboot需要加上特定的参数。 …

C语言王国探险记之函数的简单概念

王国探险记系列 文章目录&#xff08;5&#xff09; 目录 王国探险记系列 文章目录&#xff08;5&#xff09; 前言 一&#xff0c;函数的基本概念 二&#xff0c;调用外部函数和main()函数区别 2.1如果我们将函数的定义放到后面&#xff0c;可不可以呢&#xff1f; 总结…

liunx安装git

liunx安装git &#xff1a; 提示&#xff1a;记录自己装git 过程 执行下边命令安装 yum -y install git 安装完查看是否安装成功 git --version安装路径默认在/usr/libexe 愉快开始使用git

12 MFC常用控件(一)

文章目录 button 按钮设置默认按钮按下回车后会响应禁用开启禁用设置隐藏设置显示设置图片设置Icon设置光标 Cbutton 类创建按钮创建消息单选按钮多选按钮 编辑框组合框下拉框操作 CListBox插入数据获取当前选中 CListCtrl插入数据设置表头修改删除 button 按钮 设置默认按钮按…

(02)ATF环境搭建

环境搭建 工具链 安装gcc工具链、gdb调试器和qemu模拟器。 sudo apt-get install gcc-aarch64-linux-gnu gdb-multiarch qemu-system-arm库 由于安全启动需要OpenSSL和MbedTLS这两个库。 openssl wget https://www.openssl.org/source/openssl-3.0.8.tar.gz tar -xvf opens…

2023.07.05 ARM day6

实验1 1.在键盘输入一个字符&#xff0c;串口工具进行显示 2.例如&#xff1a;在在键盘输入一个字符a,串口工具进行显示b 实验2 1.在键盘输入一个字符串&#xff0c;串口工具进行显示 2.例如&#xff1a;在在键盘输入一个字符串“huyue”,串口工具进行显示“huyue” inclu…

基于matlab使用自动要素匹配查找图像旋转和缩放(附源码)

一、前言 此示例演示如何自动确定一对图像之间的几何变换。当一个图像因旋转和缩放而相对于另一个图像失真时&#xff0c;请使用 和 查找旋转角度和比例因子。然后&#xff0c;您可以转换扭曲的图像以恢复原始图像。 二、步骤 1&#xff1a;读取图像 将映像引入工作区。 三、…

Electron详解(二):基本使用与项目打包

文章目录 一、electron的基本使用创建一个 electron 项目创建应用窗口和页面 二、打包应用程序&#xff08;window平台&#xff09;使用 Electron Forge 打包打包报错解决方法 三、打包应用程序&#xff08;Linux平台&#xff09;electron builder打包 一、electron的基本使用 …

Baltamatica 北太天元 —— 基于模拟退火算法的旅行商问题

文章目录 北太天元&#xff08;Baltamatica&#xff09;简介Baltamatica 复现基于Matlab的旅行商问题问题描述模拟退火算法Metropolis准则算法流程图&#xff1a; Demo1&#xff1a;只考虑累计距离&#xff0c;通过模拟退火算法求解最短路径matlab代码&#xff1a;Baltam代码&a…