状态的一致性和FlinkSQL

状态一致性

一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次,但是结果只有一个。
三个级别:

  1. 最多一次:1次或0次,有可能丢数据
  2. 至少一次:1次或n次,出错可能会重试
    • 输入端只要可以做到数据重放,即在出错后,可以重新发送一样的数据
  3. 精确一次:数据只会发送1次
    • 幂等写入:多次重复操作不影响结果,有可能出现某个值由于数据重放,导致结果回到原先的值,然后逐渐恢复。
    • 预写日志:
      1. 先把结果数据作为日志状态保存起来
      2. 进行检查点保存时,也会将这些结果数据一并做持久化存储
      3. 在收到检查点完成的通知时,将所有结果数据一次性写入外部系统
    • 预写日志缺点:这种再次确认的方式,如果写入成功返回的ack出现故障,还是会出现数据重复。
    • 两阶段提交(2PC):数据写入过程和数据提交分为两个过程,如果写入过程没有发生异常,就将事务进行提交。
      • 算子节点在收到第一个数据时,就开启一个事务,然后提交数据,在下一个检查点到达前都是预写入,如果下一个检查点正常,再进行最终提交。
      • 对外部系统有一定的要求,要能够识别事务ID,事务的重复提交应该是无效的。
      • 即barrier到来时,如果结果一致,就提交事务,否则进行事务回滚

Flink和Kafka连接时的精确一次保证

  • 开启检查点
  • 开启事务隔离级别,读已提交
  • 注意设置kafka超时时间为10分钟
public class Flink02_KafkaToFlink {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//开启检查点env.enableCheckpointing(1000L);//kafka sourceKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setGroupId("flinkb").setTopics("topicA")//优先使用消费者组 记录的Offset进行消费,如果offset不存在,根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定,统一使用通用方法.setProperty("isolation.level", "read_committed").build();DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");//处理过程//kafka SinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("first").setValueSerializationSchema(new SimpleStringSchema()).build())//语义//AT_LEAST_ONCE:至少一次,表示数据可能重复,需要考虑去重操作//EXACTLY_ONCE:精确一次//kafka transaction timeout is larger than broker//kafka超时时间:1H//broker超时时间:15分钟//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障.setTransactionalIdPrefix("flink"+ RandomUtils.nextInt(0,100000))
//                .setProperty(ProducerConfig.RETRIES_CONFIG,"10").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"60*1000*10")//10分钟.build();ds.map(JSON::toJSONString).sinkTo(kafkaSink);//写入到kafka 生产者ds.sinkTo(kafkaSink);try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

FlinkSQL1.17

FlinkSQL不同版本的接口仍在变化,有变动查看官网。
在官网这个位置可以查看Flink对于以来的一些官方介绍。
在这里插入图片描述
Table依赖剖析
三个依赖:
1. flink-table-api-java-uber-1.17.2.jar (所有的Java API)
2. flink-table-runtime-1.17.2.jar (包含Table运行时)
3. flink-table-planner-loader-1.17.2.jar (查询计划器,即SQL解析器)

静态导包:在import后添加static,并在类后面加上*导入全部。主要是为了方便使用下面的 $ 方法,否则 $ 方法前面都要添加Expressions的类名前缀

table.where($("vc").isGreaterOrEqual(100)).select($("id"),$("vc"),$("ts")).execute().print();

程序架构

  1. 准备环境
    • 流表环境:基于流创建表环境
    • 表环境:从操作层面与流独立,底层处理还是流
  2. 创建表
    • 基于流:将流转换为表
    • 连接器表
  3. 转换处理
    • 基于Table对象,使用API进行处理
    • 基于SQL的方式,直接写SQL处理
  4. 输出
    • 基于Table对象或连接器表,输出结果
    • 表转换为流,基于流的方式输出

流处理中的表

  • 处理的数据对象
    • 关系:字段元组的有界集合
    • 流处理:字段元组的无限序列
  • 对数据的访问
    • 关系:可以得到完整的
    • 流处理:数据是动态的

因此处理过程中的表是动态表,必须要持续查询。

流表转换

持续查询

  • 追加查询:窗口查询的结果通过追加的方式添加到表的末尾,使用toDataStream
  • 更新查询:窗口查询的结果会对原有的结果进行修改, 使用toChangeLogStream
  • 如果不清楚是什么类型,直接使用toChangeLogSteam()将表转换为流
public class Flink04_TableToStreamQQ {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] fields = line.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));});Table table = tableEnv.fromDataStream(ds);tableEnv.createTemporaryView("t1", table);//SQLString appendSQL = "select user, url, ts from t1 where user <> 'zhangsan'";//需要在查询过程中更新上一次的值String updateSQL = "select user, count(*) cnt from t1 group by user";Table resultTable = tableEnv.sqlQuery(updateSQL);//表转换为流//doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user], select=[user, COUNT(*) AS cnt])
//        DataStream<Row> rowDs = tableEnv.toDataStream(resultTable);//有更新操作时,使用toChangelogStream(),它即支持追加,也支持更新查询DataStream<Row> rowDs = tableEnv.toChangelogStream(resultTable);rowDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

将动态表转换为流

  • 仅追加流:如果表的结果都是追加查询
  • Retract撤回流:
    • 包含两类消息,添加消息和撤回消息
    • 下游需要根据这两类消息进行处理
  • 更新插入流:
    • 两种消息:更新插入消息(带key)和删除消息

连接器

  • DataGen和Print连接器
public class Flink01_DataGenPrint {public static void main(String[] args) {//TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());//1. 准备表环境, 基于流环境,创建表环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//DataGenString createTable =" create table t1 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT " +" ) WITH (" +"  'connector' = 'datagen' ,"  +"  'rows-per-second' = '1' ," +"  'fields.id.kind' = 'random' , " +"  'fields.id.length' = '6' ," +"  'fields.vc.kind' = 'random' , " +"  'fields.vc.min' = '100' , " +"  'fields.vc.max' = '1000' ," +"  'fields.ts.kind' = 'sequence' , " +"  'fields.ts.start' = '1000000' , " +"  'fields.ts.end' = '100000000' " +" )" ;tableEnv.executeSql(createTable);//Table resultTable = tableEnv.sqlQuery("select * from t1 where vc >= 200");//.execute().print();//printString sinkTable ="create table t2(" +"id string," +"vc int," +"ts bigint" +") with (" +"   'connector' = 'print', " +"   'print-identifier' = 'print>' " +")";tableEnv.executeSql(sinkTable);tableEnv.executeSql("insert into t2 select id, vc, ts from t1 where vc >= 200");}
}
  • 文件连接器
public class Flink02_FileConnector {public static void main(String[] args) {TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());//FileSourceString sourceTable =" create table t1 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +//"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号"  `file.size` bigint not null METADATA" +" ) WITH (" +"  'connector' = 'filesystem' ,"  +"  'path' = 'input/ws.txt' ,"  +"  'format' = 'csv' "  +" )" ;tableEnvironment.executeSql(sourceTable);//tableEnvironment.sqlQuery(" select * from t1 ").execute().print();//转换处理...//File sinkString sinkTable =" create table t2 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +//"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号"  file_size bigint" +" ) WITH (" +"  'connector' = 'filesystem' ,"  +"  'path' = 'output' ,"  +"  'format' = 'json' "  +" )" ;tableEnvironment.executeSql(sinkTable);tableEnvironment.executeSql("insert into t2 " +"select id, vc, ts, `file.size` from t1");}
}
  • kafka连接器
public class Flink03_KafkaConnector {public static void main(String[] args) {TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());//kafka sourceString sourceTable =" create table t1 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +"  `topic` string not null METADATA," +"  `partition` int not null METADATA," +"  `offset` bigint not null METADATA" +" ) WITH (" +"  'connector' = 'kafka' ,"  +"  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +"  'topic' = 'topicA', "  +"  'properties.group.id' = 'flinksql', "  +"  'value.format' = 'csv', "  +"  'scan.startup.mode' = 'group-offsets',"  +"  'properties.auto.offset.reset' = 'latest' "  +" )" ;//创建表tableEnvironment.executeSql(sourceTable);//打印查询结果//tableEnvironment.sqlQuery(" select * from t1 ").execute().print();//转换处理...//kafka SinkString sinkTable =" create table t2 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +"  `topic` string " +" ) WITH (" +"  'connector' = 'kafka' ,"  +"  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +"  'topic' = 'topicB', "  +"  'sink.delivery-guarantee' = 'at-least-once', "  +//"  'properties.transaction.timeout.ms' = '', "  +//"  'sink.transactional-id-prefix' = 'xf', "  +//"  'properties.group.id' = 'flinksql', "  +"  'value.format' = 'json' "  +//"  'scan.startup.mode' = 'group-offsets',"  +//"  'properties.auto.offset.reset' = 'latest' "  +" )" ;tableEnvironment.executeSql(sinkTable);tableEnvironment.executeSql("insert into t2 " +"select id, vc, ts, `topic` from t1");}
}
  • Jdbc连接器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/277433.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

黑马点评04集群下的并发安全

实战篇-08.优惠券秒杀-集群下的线程并发安全问题_哔哩哔哩_bilibili 为了应对高并发&#xff0c;需要把项目部署到多个机器构成集群&#xff0c;所以需要配置nginx。 1.如何模拟集群 通过idea的ctrl d修改配置&#xff0c;实现多个tomcat运行模拟集群 然后在nginx上配置节点&…

【产品经理】产品专业化提升路径

产品专业化就是上山寻路&#xff0c;梳理一套作为产品经理的工作方法。本文作者从设计方法、三基座、专业强化、优秀产品拆解、零代码这五个方面&#xff0c;对产品经理的产品专业化进行了总结归纳&#xff0c;一起来看一下吧。 产品专业化就是上山寻路&#xff0c;梳理一套作为…

MySQL——表的约束

目录 一.表的约束 二.空属性 ​编辑三.默认值 四.列描述 五.主键 1.主键 2.符合主键 六.自增长 七.唯一键 八.外键 一.表的约束 真正约束字段的是数据类型&#xff0c;但是数据类型约束很单一&#xff0c;需要有一些额外的约束&#xff0c;更好的保证数据的合法性&…

linux sed命令删除一行/多行_sed删除第一行/linux删除文件某一行

sed系列文章 linux常用命令(9)&#xff1a;sed命令(编辑/替换/删除文本)linux sed命令删除一行/多行_sed删除第一行/linux删除文件某一行linux sed批量修改替换文件中的内容/sed特殊字符 文章目录 sed系列文章一、sed删除1.1、sed删除某一行内容/删除最后一行1.2、sed删除多行…

Maven进阶篇超详细笔记

Maven进阶篇详细笔记&#xff0c;源码可见下载链接 大家阅读时可善用目录功能&#xff0c;可以提高大家的阅读效率 下载地址&#xff1a;Maven笔记项目源码 分模块开发 分模块开发的意义 将原始模块查分成若干个子模块&#xff0c;方便模块间的相互调用&#xff0c;接口共享 …

EasyExcel读取Excel数据(含多种方式)

目录 EasyExcel简介 使用EasyExcel进行读数据 引入依赖&#xff1a; EasyExcel提供了两种读取模式 使用 监听器 读取模式 1.创建一个实体类 2.创建监听器 代码 使用 同步读 读取模式 1.创建一个实体类 2.代码 添加导入数据库的逻辑 其实官方文档讲得很清楚&#xff…

为什么需要Bootloader

目录 一、Bootloader简介二、所需知识点一、Bootloader简介 Bootloader,又称为引导程序,对操作系统非常重要,是计算机和汽车控制器的一个关键组成部分。然而,因为它往往在后台发挥作用,它经常被忽视。其主要用于软件更新。但ECU的软件更新方式有很多,比如通过JTAG调试更…

C++_构造函数与析构函数

目录 1、构造函数的写法 1.2 构造函数优化写法 2、默认构造函数与默认成员函数 2.1 默认成员函数对不同类型的处理 3、对内置类型的补丁 4、析构函数 4.1 析构函数的写法 5、默认析构函数 6、初始化列表 6.1 初始化列表的写法 6.2 初始化列表的作用 6.3 回顾与总结 …

SpringBoot基础使用及对其他项目进行整合

目录 一、简介 1-讲述 2-特点 二、创建配置 1.创建 2.配置 3.代码生成 三、项目整合 每篇一获 一、简介 1-讲述 众所周知 Spring 应用需要进行大量的配置&#xff0c;各种 XML 配置和注解配置让人眼花缭乱&#xff0c;且极容易出错&#xff0c;因此 Spring 一度被称…

深入了解空号检测API:提升通信效率的关键

引言 随着通信技术的不断发展&#xff0c;人们对于通信效率的要求也越来越高。在通信过程中&#xff0c;空号检测是一个非常重要的环节&#xff0c;它可以帮助我们避免无效的通信&#xff0c;提高通信效率。而空号检测API则是实现空号检测功能的重要工具。 空号检测API 空号…

人工智能多模态:看、听、说,智能感知的全方位融合

导言 人工智能多模态技术是指通过整合视觉、听觉、语言等多个感知模态的信息&#xff0c;实现对丰富、多样化数据的理解与处理。本文将深入研究人工智能多模态的技术原理、应用场景以及对未来感知智能的影响。 1. 简介 人工智能多模态技术通过整合多个感知模态的信息&#xff…

安恒明御安全网关 aaa_local_web_preview文件上传漏洞复现

0x01 产品简介 明御安全网关秉持安全可视、简单有效的理念,以资产为视角,构建全流程防御的下一代安全防护体系,并融合传统防火墙、入侵检测、入侵防御系统、防病毒网关、上网行为管控、VPN网关、威胁情报等安全模块于一体的智慧化安全网关。 0x02 漏洞概述 明御安全网关在…