[flink] flink macm1pro 快速使用从零到一

文章目录

  • 快速使用

快速使用

  1. 打开 https://flink.apache.org/downloads/ 下载 flink

因为书籍介绍的是 1.12版本的,为避免不必要的问题,下载相同版本

image.png
image.png

  1. 解压
 tar -xzvf flink-1.11.2-bin-scala_2.11.tgz

image.png

  1. 启动 flink
./bin/start-cluster.sh

image.png

  1. 打开 flink web 页面 localhost:8081

image.png

  1. 编写结合 Kafka 词频统计程序

具体参考 https://weread.qq.com/web/reader/51032ac07236f8e05107816k1f032c402131f0e3dad99f3?

package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;public class WordCountKafkaInStdOut {public static void main(String[] args) throws Exception {// 设置Flink执行环境 StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// Kafka参数 Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");String inputTopic = "Shakespeare";String outputTopic = "WordCount";// Source FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),properties);DataStream<String> stream = env.addSource(consumer);// Transformation // 使用Flink  API对输入流的文本进行操作 // 按空格切词、计数、分区、设置时间窗口、聚合 DataStream<Tuple2<String, Integer>> wordCount = stream.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] tokens = line.split("\\s");// 输出结果  for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<>(token, 1));}}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).timeWindow(Time.seconds(5)).sum(1);// Sink wordCount.print();// execute env.execute("kafka streaming word count");}
} 
  1. 打包应用(当然在这之前需要本地调试一下,至少得运行通吧😄)
  2. 使用Flink提供的命令行工具flink,将打包好的作业提交到集群上。命令行的参数 --class 用来指定哪个主类作为入口。
./bin/flink run --class org.example.WordCountKafkaInStdOut xxtarget/flink_study-1.0-SNAPSHOT.jar

class 建议直接拷贝引用
image.png

  1. web 页面查看作业提交成功

image.png

  1. kafka 生产者随便发点消息

image.png

  1. 查看作业日志,词频统计结果

image.png
image.png

  1. 关闭 flink
./bin/stop-cluster.sh

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/575653.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s1.28.8版本配置prometheus监控告警

文章目录 官方架构图组件的具体介绍kube-prometheus包含的组件简介&#xff1a;文件存储路径&#xff1a; 结构分析官网自带的一些规则自己总结流程 1-创建规则磁盘使用率报警规则 详解上面rule流程Alertmanagerg查看 2-报警接收器2.1-邮件报警修改Alertmanager配置查看现有的s…

人工智能(pytorch)搭建模型25-基于pytorch搭建FPN特征金字塔网络的应用场景,模型结构介绍

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能(pytorch)搭建模型25-基于pytorch搭建FPN特征金字塔网络的应用场景&#xff0c;模型结构介绍。特征金字塔网络&#xff08;FPN&#xff09;是一种深度学习模型结构&#xff0c;主要应用于目标检测任务中&am…

在宝塔面板中,为自己的云服务器安装SSL证书,为所搭建的网站启用https(主要部分攻略)

前提条件 My HTTP website is running Nginx on Debian 10&#xff08;或者11&#xff09; 时间&#xff1a;2024-3-28 16:25:52 你的网站部署在Debain 10&#xff08;或者11&#xff09;的 Nginx上 安装单域名证书&#xff08;默认&#xff09;&#xff08;非泛域名&#xf…

寄主机显示器被快递搞坏了怎么办?怎么破?

大家好&#xff0c;我是平泽裕也。 最近&#xff0c;我在社区里看到很多关于开学后弟弟寄来的电脑显示器被快递损坏的帖子。 看到它真的让我感到难过。 如果有人的数码产品被快递损坏了&#xff0c;我会伤心很久。 那么今天就跟大家聊聊寄快递的一些小技巧。 作为一名曾经的…

iOS开发进阶(十一):ViewController 控制器详解

文章目录 一、前言二、UIViewController三、UINavigationController四、UITabBarController五、UIPageViewController六、拓展阅读 一、前言 iOS 界面开发最重要的首属ViewController和View&#xff0c;ViewController是View的控制器&#xff0c;也就是一般的页面&#xff0c;…

Swagger添加JWT验证(ASP.NET)

文章目录 JWT1、解析2、配置JWT JWT 1、解析 1&#xff09;客户端向授权服务系统发起请求&#xff0c;申请获取“令牌”。 2&#xff09;授权服务根据用户身份&#xff0c;生成一张专属“令牌”&#xff0c;并将该“令牌”以JWT规范返回给客户端 3&#xff09;客户端将获取到的…

记录minio、okhttp、kotlin一连环的版本冲突问题

问题背景 项目中需要引入minio&#xff0c;添加了如下依赖 <dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.5.2</version></dependency> 结果运行报错&#xff1a; Caused by: java.la…

SpringMVC设置全局异常处理器

文章目录 背景分析使用ControllerAdvice&#xff08;RestControllerAdvice&#xff09;ExceptionHandler实现全局异常全局异常处理-多个处理器匹配顺序存在一个类中存在不同的类中 对于过滤器和拦截器中的异常&#xff0c;有两种思路可以考虑 背景 在项目中我们有需求做一个全…

.helper勒索病毒的最新威胁:如何恢复您的数据?

导言&#xff1a; 随着信息技术的不断进步&#xff0c;网络安全问题日益突出&#xff0c;其中勒索病毒成为了威胁网络安全的一大隐患。.helper勒索病毒作为近期频繁出现的一种恶意软件&#xff0c;其危害性和传播速度引起了广大用户的深切关注。本文将深入探讨.helper勒索病毒…

如何使用Windows电脑部署Lychee私有图床网站并实现无公网IP远程管理本地图片

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法|MySQL| ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-MSVdVLkQMnY9Y2HW {font-family:"trebuchet ms",verdana,arial,sans-serif;f…

uniapp h5 引入阿里云一键登录

参考官方文档: 如何将H5页面接入网页端SDK并一键登录_号码认证服务(PNVS)-阿里云帮助中心 本文主要分享uniapp 对SDK依赖文件的引入 采用npm包引入的方法: 1.下载 // 下载npm资源并添加依赖到package.json npm i aliyun_numberauthsdk_web -S tips: 查看package.json文件,确…

最小可行产品需要最小可行架构——可持续架构(三)

前言 最小可行产品&#xff08;MVP&#xff09;的概念可以帮助团队专注于尽快交付他们认为对客户最有价值的东西&#xff0c;以便在投入大量时间和资源之前迅速、廉价地评估产品的市场规模。MVP不仅需要考虑产品的市场可行性&#xff0c;还需要考虑其技术可行性&#xff0c;以…