大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)

一、需求描述

每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。

二、需求分析

  • 1.统计每个商品的点击量, 开窗
  • 2.分组窗口分组
  • 3.over窗口

三、需求实现

3.1、创建数据源示例

input/UserBehavior.csv

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000

3.2、创建目标表

CREATE DATABASE flink_sql; //创建flink_sql库
USE flink_sql;
DROP TABLE IF EXISTS `hot_item`;
CREATE TABLE `hot_item` (`w_end` timestamp NOT NULL,`item_id` bigint(20) NOT NULL,`item_count` bigint(20) NOT NULL,`rk` bigint(20) NOT NULL,PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.3、导入JDBC Connector依赖

<!-- 导入JDBC Connector依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

3.4、代码实现

package com.atguigu.flink.java.chapter_12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/31 9:11*/
public class Flink01_HotItem_TopN {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 使用sql从文件读取数据tenv.executeSql("create table user_behavior(" +"   user_id bigint, " +"   item_id bigint, " +"   category_id int, " +"   behavior string, " +"   ts bigint, " +"   event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " +"   watermark for event_time as  event_time - interval '5' second " +")with(" +"   'connector'='filesystem', " +"   'path'='input/UserBehavior.csv', " +"   'format'='csv')");// 每隔 10m 统计一次最近 1h 的热门商品 top// 1. 计算每每个窗口内每个商品的点击量Table t1 = tenv.sqlQuery("select " +"   item_id, " +"   hop_end(event_time, interval '10' minute, interval '1' hour) w_end," +"   count(*) item_count " +"from user_behavior " +"where behavior='pv' " +"group by hop(event_time, interval '10' minute, interval '1' hour), item_id");tenv.createTemporaryView("t1", t1);// 2. 按照窗口开窗, 对商品点击量进行排名Table t2 = tenv.sqlQuery("select " +"   *," +"   row_number() over(partition by w_end order by item_count desc) rk " +"from t1");tenv.createTemporaryView("t2", t2);// 3. 取 top3Table t3 = tenv.sqlQuery("select " +"   item_id, w_end, item_count, rk " +"from t2 " +"where rk<=3");// 4. 数据写入到mysql// 4.1 创建输出表tenv.executeSql("create table hot_item(" +"   item_id bigint, " +"   w_end timestamp(3), " +"   item_count bigint, " +"   rk bigint, " +"   PRIMARY KEY (w_end, rk) NOT ENFORCED)" +"with(" +"   'connector' = 'jdbc', " +"   'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " +"   'table-name' = 'hot_item', " +"   'username' = 'root', " +"   'password' = 'aaaaaa' " +")");// 4.2 写入到输出表t3.executeInsert("hot_item");}
}

执行结果:
在这里插入图片描述

四、总结

Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。
流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/125347.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多线程 - 单例模式

单例模式 ~~ 单例模式是常见的设计模式之一 什么是设计模式 你知道象棋,五子棋,围棋吗?如果,你想下好围棋,你就不得不了解一个东西,”棋谱”,设计模式好比围棋中的 “棋谱”. 在棋谱里面,大佬们,把一些常见的对局场景,都给推演出来了,照着棋谱来下棋,基本上棋力就不会差到哪…

Linux系统编程系列之线程池

Linux系统编程系列&#xff08;16篇管饱&#xff0c;吃货都投降了&#xff01;&#xff09; 1、Linux系统编程系列之进程基础 2、Linux系统编程系列之进程间通信(IPC)-信号 3、Linux系统编程系列之进程间通信(IPC)-管道 4、Linux系统编程系列之进程间通信-IPC对象 5、Linux系统…

redis-设置从节点

节点结构 节点配置文件 主节点 不变 6380节点 port 6380 slaveof 127.0.0.1 63796381节点 port 6381 slaveof 127.0.0.1 6380启动 指定配置文件的方式启动 D:\jiqun\redis\Redis-6380>redis-server.exe redis.windows.conf启动时&#xff0c;会触发同步数据命令 主节点…

论文阅读——Pyramid Grafting Network for One-Stage High Resolution Saliency Detection

目录 基本信息标题目前存在的问题改进网络结构CMGM模块解答为什么要用这两个编码器进行编码 另一个写的好的参考 基本信息 期刊CVPR年份2022论文地址https://arxiv.org/pdf/2204.05041.pdf代码地址https://github.com/iCVTEAM/PGNet 标题 金字塔嫁接网络的一级高分辨率显著性…

【LeetCode热题100】--74.搜索二维矩阵

74.搜索二维矩阵 按行搜索&#xff0c;使用二分查找 class Solution {public boolean searchMatrix(int[][] matrix, int target) {for(int[] row : matrix){int index search(row,target);if(index > 0){return true;}}return false;}public int search(int[] nums,int t…

Linux CentOS7 vim临时文件

在vim中&#xff0c;由于断网、停电、故意退出、不小心关闭终端等多种原因&#xff0c;正在编辑的文件没有保存&#xff0c;系统将会为文件保存一个交换文件&#xff0c;或称临时文件&#xff0c;或备份文件。 如果因某种原因产生了交换文件&#xff0c;每次打开文件时&#x…

【Java】接口 interface

目录 概述 示例代码&#xff1a; 接口成员访问特点 示例代码&#xff1a; 概述 什么是接口 接口就是一种公共的规范标准&#xff0c;只要符合规范标准&#xff0c;大家都可以调用。 Java 中的接口更多的体现在对行为的抽象&#xff01; 1. 接口 用关键字 interface 修饰 pub…

基于SpringBoot的ElasticSearch操作(超详细教程)

一、ElasticSearch 简介 1、简介 ElasticSearch 是一个基于 Lucene 的搜索服务器。它提供了一个分布式多员工能力的全文搜索引擎&#xff0c;基于 RESTful web 接口。Elasticsearch 是用 Java 语言开发的&#xff0c;并作为 Apache 许可条款下的开放源码发布&#xff0c;是一种…

ElasticSearch第四讲:ES详解:ElasticSearch和Kibana安装

ElasticSearch第四讲&#xff1a;ES详解&#xff1a;ElasticSearch和Kibana安装 本文是ElasticSearch第四讲&#xff1a;ElasticSearch和Kibana安装&#xff0c;主要介绍ElasticSearch和Kibana的安装。了解完ElasticSearch基础和Elastic Stack生态后&#xff0c;我们便可以开始…

Pikachu靶场——远程命令执行漏洞(RCE)

文章目录 1. RCE1.1 exec "ping"1.1.1 源代码分析1.1.2 漏洞防御 1.2 exec "eval"1.2.1 源代码分析1.2.2 漏洞防御 1.3 RCE 漏洞防御 1. RCE RCE(remote command/code execute)概述&#xff1a; RCE漏洞&#xff0c;可以让攻击者直接向后台服务器远程注入…

flex布局与几个实例(含源码)

本文简单的说明下flex布局 有源码实例&#xff0c;后续会持续添加 flex默认主轴是横轴 容器主要有6个属性 flex-direction 决定主轴的方向 flex-direction: row | row-reverse | column | column-reverse; flex-wrap 决定是否换行 flex-wrap: nowrap | wrap | wrap-revers…

华为云云耀云服务器L实例评测|Elasticsearch的springboot整合 Kibana进行全查询和模糊查询

前言 最近华为云云耀云服务器L实例上新&#xff0c;也搞了一台来玩&#xff0c;期间遇到各种问题&#xff0c;在解决问题的过程中学到不少和运维相关的知识。 在前几期的博客中&#xff0c;介绍了Elasticsearch的Docker版本的安装&#xff0c;Elasticsearch的可视化Kibana工具…