JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

  • KafkaStream
    • 概述
    • 案例-统计单词个数
    • SpringBoot集成
  • 实时计算文章分值
  • 来源
  • Gitee

KafkaStream

概述

  • Kafka Stream: 提供了对存储与Kafka内的数据进行流式处理和分析的功能
  • 特点:
    • Kafka Stream提供了一个非常简单而轻量的Library, 它可以非常方便地嵌入任意Java应用中, 也可以任意方式打包和部署
    • 除了Kafka外, 无任何外部依赖
    • 通过可容错地state, store实现高效地状态操作(如windowed join和aggregation)
    • 支持基于事件时间地窗口操作, 并且可处理晚到的数据(late arrival of records)
  • 关键概念:
    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器.
    • Sink处理器: sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
  • KStream:
    • 数据结构类似于map, key-value键值对.
    • 一段顺序的, 无限长, 不断更新的数据集.

案例-统计单词个数

  • 依赖
    依赖中有排除部分依赖, 还是一整个放上来好了
    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>
    </dependencies>
    
  • 流式处理
    public class KafkaStreamQuickStart {public static void main(String[] args) {// Kafka的配置信息Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");// Stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();// 流式计算streamProcessor(streamsBuilder);// 创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);// 开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容: hello kafka* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");}
    }
    
  • 发送消息
    for (int i = 0; i < 5; i++) {ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("icast-topic-input", "hello kafka");producer.send(kvProducerRecord);
    }
    
  • 接收消息
    // 订阅主题
    consumer.subscribe(Collections.singleton("itcast-topic-out"));
    

SpringBoot集成

  • 配置
    config.java
    /*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/
    @Setter
    @Getter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
    }
    
    application.yml
    kafka:hosts: 192.168.174.133:9092group: ${spring.application.name}
    
    BeanConfig.java
    @Slf4j
    @Configuration
    public class KafkaStreamHelloListener {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");return stream;}
    }
    

实时计算文章分值

  1. nacos: leadnews-behavior配置kafka生产者
    kafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
    
  2. 修改leadnews-behavior
    like
    public ResponseResult likesBehavior(LikesBehaviorDto dto) {...UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);if(dto.getOperation() == 0){...mess.setAdd(1);}else{...mess.setAdd(-1);}// kafka: 发送消息, 数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));...
    }
    
    view
    public ResponseResult readBehavior(ReadBehaviorDto dto) {...// kafka: 发送消息, 数据聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));...
    }
    
  3. leadnews-article中添加流式聚合处理
    @Slf4j
    @Configuration
    public class HotArticleStreamHandler {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 接收消息KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);// 聚合流式处理stream.map((key, value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);// 重置消息的key和valuereturn new KeyValue<>(mess.getArticleId().toString(), mess.getType().name()+":"+mess.getAdd());})// 根据文章id进行聚合.groupBy((key, value)->key)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 自行实现聚合计算.aggregate(// 初始方法, 返回值是消息的valuenew Initializer<String>() {@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}},// 真正的聚合操作, 返回值是消息的valuenew Aggregator<String, String, String>() {@Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col=0, com=0, lik=0, vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 获得初始值, 也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}// 累加操作String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id "+key);System.out.println("当前时间窗口内的消息处理结果: "+formatStr);return formatStr;}}, Materialized.as("hot-article-stream-count-001")).toStream().map((key, value)->{return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value));})// 发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/private String formatObj(String articleId, String value) {ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));// COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为: {}", JSON.toJSONString(mess));return JSON.toJSONString(mess);}}
  4. leadnews-article添加聚合数据监听器
    @Slf4j
    @Component
    public class ArticleIncrHandlerListener {@Autowiredprivate ApArticleService apArticleService;@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);System.out.println(mess);}}}
    
  5. 替换redis中的热点数据
    /*** 更新 文章分值, 缓存中的热点文章数据* @param mess*/
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {// 1. 更新文章的阅读, 点赞, 收藏, 评论的数量ApArticle apArticle = updateArticleBehavior(mess);// 2. 计算文章的分值Integer score = hotArticleService.computeArticleScore(apArticle);score *= 3;// 3. 替换当前文章对应频道的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score);// 4. 替换推荐对应的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score);
    }/*** 替换数据并且存入到redis中* @param HOT_ARTICLE_FIRST_PAGE* @param apArticle* @param score*/
    private void replaceDataToRedis(String HOT_ARTICLE_FIRST_PAGE, ApArticle apArticle, Integer score) {String articleList = cacheService.get(HOT_ARTICLE_FIRST_PAGE);if(StringUtils.isNotBlank(articleList)){List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleList, HotArticleVo.class);boolean flag = true;// 3.1 文章已是热点文章, 则更新文章分数for (HotArticleVo hotArticleVo : hotArticleVoList) {if(hotArticleVo.getId().equals(apArticle.getId())){hotArticleVo.setScore(score);flag = false;break;}}// 3.2 文章还不是热点文章, 则替换分数最小的文章if(flag){if(hotArticleVoList.size() >= 30){// 热点文章超过30条hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if(lastHot.getScore() < score){hotArticleVoList.remove(lastHot);HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}else{// 热点文章没超过30条HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}// 3.3 缓存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(HOT_ARTICLE_FIRST_PAGE, JSON.toJSONString(hotArticleVoList));}
    }/*** 更新文章行为数据* @param mess*/
    private ApArticle updateArticleBehavior(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;
    }
    

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/106031.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AggregateFunction结合自定义触发器实现点击率计算

背景&#xff1a; 接上一篇文章&#xff0c;ProcessWindowFunction 结合自定义触发器会有状态过大的问题&#xff0c;本文就使用AggregateFunction结合自定义触发器来实现&#xff0c;这样就不会导致状态过大的问题了 AggregateFunction结合自定义触发器实现 flink对于每个窗…

如何实现MongoDB数据的快速迁移?

作为一种Schema Free文档数据库&#xff0c;MongoDB因其灵活的数据模型&#xff0c;支撑业务快速迭代研发&#xff0c;广受开发者欢迎并被广泛使用。在企业使用MongoDB承载应用的过程中&#xff0c;会因为业务上云/跨云/下云/跨机房迁移/跨地域迁移、或数据库版本升级、数据库整…

【DockerCE】Docker-CE 24.0.6正式版发布

官网下载地址&#xff08;For RHEL/CentOS 7.9&#xff09;&#xff1a; https://download.docker.com/linux/centos/7/x86_64/stable/Packages/ 相对于24.0.5版本&#xff0c;本次24.0.6版本更新的rpm包有 5 个&#xff0c;使用目录对比软件对比的结果如下&#xff1a; 在Lin…

(Note)中文EI检索期刊目录

ei和sci、ssci一样是国际知名的期刊数据库&#xff0c;ei不仅收录国际知名的刊物&#xff0c;也收录了一些国内期刊&#xff0c;为方便投稿选刊&#xff0c;Elsevier官网更新了的EI Compendex期刊目录&#xff0c;那么 国内ei期刊有哪些? 经查询共有250余种期刊&#xff0c;新…

什么是Docker和Docker-Compose?

Docker的构成 Docker仓库&#xff1a;https://hub.docker.com Docker自身组件 Docker Client&#xff1a;Docker的客户端 Docker Server&#xff1a;Docker daemon的主要组成部分&#xff0c;接受用户通过Docker Client发出的请求&#xff0c;并按照相应的路由规则实现路由分发…

Qt包含文件不存在问题解决 QNetworkAccessManager

这里用到了Qt的网络模块&#xff0c;在.pro中添加了 QT network 但是添加 #include <QNetworkAccessManager> 会报错说找不到&#xff0c;可以通过在项目上右键执行qmake后&#xff0c;直接#include <QNetworkAccessManager>就不会报错了&#xff1a;

20230911 Shell指令数组以及函数值传递,值返回

实现一个对数组求和的函数&#xff0c;数组通过实参传递给函数 #!/bin/bashfunction fun() {sum0for ((i0;i<$var;i))do(( sumarr[i] ))doneecho $sum } read -p "输入该数组个数: " var for((j0;j<$var;j)) doread -p "输入数组第$j个值: " arr[j] …

【ARM CoreLink 系列 2 -- CCI-400 控制器简介】

文章目录 CCI-400 介绍DVM 机制介绍DVM 消息传输过程TOKEN 机制介绍 下篇文章&#xff1a;ARM CoreLink 系列 3 – CCI-550 控制器介绍 CCI-400 介绍 CCI&#xff08;Cache Coherent Interconnect&#xff09;是ARM 中 的Cache一致性控制器。 CCI-400 将 Interconnect 和coh…

Ubuntu下Python3与Python2相互切换

参考文章&#xff1a;https://blog.csdn.net/Nicolas_shen/article/details/124144931 设置优先级 sudo update-alternatives --install /usr/bin/python python /usr/bin/python2 100 sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 200

npm版本升级报错

解决方法&#xff1a; 执行npm install --legacy-peer-deps依赖对等 npm install xxx --legacy-peer-deps命令用于绕过peerDependency里依赖的自动安装&#xff1b;它告诉npm忽略项目中引入的各个依赖模块之间依赖相同但版本不同的问题&#xff0c;以npm v4-v6的方式去继续执行…

技术解码 | GB28181/SIP/SDP 协议--EasyGBS国标GB28181平台国标视频技术SIP解析

EasyGBS国标视频云服务是基于国标GB/T28181协议的视频能力平台&#xff0c;可实现的视频功能包括&#xff1a;实时监控直播、录像、检索与回看、语音对讲、云存储、告警、平台级联等功能。平台部署简单、可拓展性强&#xff0c;支持将接入的视频流进行全终端、全平台分发&#…

分类预测 | MATLAB实现PCA-GRU(主成分门控循环单元)分类预测

分类预测 | MATLAB实现PCA-GRU(主成分门控循环单元)分类预测 目录 分类预测 | MATLAB实现PCA-GRU(主成分门控循环单元)分类预测预测效果基本介绍程序设计参考资料致谢 预测效果 基本介绍 Matlab实现基于PCA-GRU主成分分析-门控循环单元多输入分类预测&#xff08;完整程序和数据…