20221124 kafka实时数据写入Redis

一、上线结论

  • 实现了将用户线上实时浏览的沉浸式视频信息,保存在Redis中这样一个功能。
  • 为实现沉浸式视频离线推荐到实时推荐提供了强有力的支持。目前只是应用在沉浸式场景,后续也能扩展到其他所有场景。
  • 用于两个场景:(1)根据用户近期观看物料匹配相似物料(2)过滤用户近期观看物料

二、实现效果展示

用户在线上刷一个视频,redis就会将用户的视频信息保存在用户历史浏览的队列中。

队列大小为100。具体保存的信息如下所示:

一、Redis存储KEY:kafka:user_short_video_streaming:_5c91e0cf0cf2f3d119f92774
二、Redis存储value:[{"duration":4,"resourceId":"28808","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":9,"resourceId":"24262","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":5,"resourceId":"25330","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}]

三、实现策略

  • 采用Java语言实现,
  • 先监听kafka,然后解析kafka消息,进行解码,再解析,从解析后的结果中提取user_id, resource_id和resource_type字段。
  • 连接Redis,构造用户队列,队列长度设置为100(用户刷的视频个数),将数据写入Redis
  • 队列大小为100,超过100顺序pop

代码Git:http://gitlab.dzj.com/applied_algorithm/data_analysis/kafka_streaming_immersive.git

四、项目后续规划

  • 扩展到Feed流,搜索召回等全部的场景
  • jar包后台运行方式改为CICD部署

五、附录

5.1 BUG分享

在实现的过程中,遇到一个序列化问题,就是写入的key和value乱码,导致用Python查询的定义好的KEY的时候查询不到,解决方案如下:

自定义RedisTemplete进行重写,  用jackson进行序列化,将这个类注册到Spring Boot中

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

package com.dzj.kafka_streaming.Config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;

import com.fasterxml.jackson.annotation.PropertyAccessor;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import org.springframework.data.redis.serializer.StringRedisSerializer;

/**

 * @Author : wangyongpeng

 * @Date : 2022/12/16 14:34

 * @Description : 重写RedisTemplate, 进行序列化

 */

@Configuration

public class RedisConfig {

    @Bean

    @SuppressWarnings("all")

    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        RedisTemplate<String, Object> template = new RedisTemplate();

        template.setConnectionFactory(redisConnectionFactory);

        // JSON序列化配置

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();

        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om);

        // String 的序列化

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // key 采用String的序列化方式

        template.setKeySerializer(stringRedisSerializer);

        // hash的key也采用String的序列化方式

        template.setHashKeySerializer(stringRedisSerializer);

        // valuex序列化方式采用jackson

        template.setValueSerializer(jackson2JsonRedisSerializer);

        // hash的序列化也用jackson

        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;

    }

}

5.2 项目核心代码

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

package com.dzj.kafka_streaming.listener;

import com.dzj.kafka_streaming.dto.TagNameTypeInfo;

import com.dzj.kafka_streaming.service.ContentTagRelationService;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import javax.annotation.Resource;

import java.util.ArrayList;

import java.util.Base64;

import java.util.List;

/**

 * "immersive_streaming_" + userId; 这是旧的key,需要清除

 */

@Component

public class MessageListener {

    @Autowired

    private ContentTagRelationService relationService;

    @Resource

    private RedisTemplate<String, Object> redisTemplate;

    private final String TOPIC_NAME = "event-trace-log";

    // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup")

    @KafkaListener(topics = {TOPIC_NAME})

    public void listener(ConsumerRecord<String,String> record)  {

        //获取消息

        String message = record.value();

        //消息偏移量

        long offset = record.offset();

        String redisKeyPrefix = "kafka:user_short_video_streaming:_";

        JSONObject dataJson = parseJson(message);

        String eventCode = dataJson.getString("eventCode");

        if ("145001".equals(eventCode)){

            // 测试环境------------------------------------------------------------------------------------------

            // 目前只关注沉浸式中得数据

            String resourceId = dataJson.getJSONObject("eventBody").getString("resourceId");

            String resourceType = dataJson.getJSONObject("eventBody").getString("resourceType");

            Integer duration = dataJson.getJSONObject("eventBody").getInteger("duration");

            String actionCode = dataJson.getJSONObject("eventBody").getString("actionCode");

            String userId = dataJson.getJSONObject("eventBody").getString("userId");

            String appType = dataJson.getJSONObject("eventBody").getString("appType");

            // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody"));

            /**

             * 写入Redis

             * redis存储结构: key = List(5),是一个定长为5,右进左出的队列

             * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个

             */

            String key = redisKeyPrefix + userId;

    //        String key = "immersive_streaming_wyp0001";

            // 定义Redis队列写入的结构

            JSONObject redisListItem = new JSONObject();

            redisListItem.put("resourceId",resourceId);

            redisListItem.put("resourceType",resourceType);

            redisListItem.put("duration",duration);

            redisListItem.put("actionCode",actionCode);

            redisListItem.put("appType",appType);

            String redisListItemString = redisListItem.toJSONString();

            if (redisTemplate.opsForList().size(key) >= 100){

                Object leftPop = redisTemplate.opsForList().leftPop(key);

                redisTemplate.opsForList().rightPush(key, redisListItemString);

                System.out.println("[pop]redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

            }else {

                if (!resourceId.isEmpty() && !resourceType.isEmpty()){

                    redisTemplate.opsForList().rightPush(key, redisListItemString);

                    Long size = redisTemplate.opsForList().size(key);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " pushed one:  "+ size + redisListItemString);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

                }

            }

        }

    }

     

    /**

     * 解析json,解码功能

     */

    public JSONObject parseJson(String message) {

        JSONObject messageJson = JSONObject.parseObject(message);

        String dataString = messageJson.getString("data");

        // --------------------base64解码字符串--------------------

        String data_string = "";

        final Base64.Decoder decoder = Base64.getDecoder();

        try{

            data_string = new String(decoder.decode(dataString), "UTF-8");

        }catch (Exception e){

            System.out.println("【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson" + e);

        }

        // string转换为json,只取eventCode = '145001'沉浸式的

        JSONObject dataJson = JSONObject.parseObject(data_string);

        return dataJson;

    }

    /**

     * 从数据库查询

     * @param resourceId

     * @param resourceType

     * @return

     */

    public List<TagNameTypeInfo>  queryByIdAndType(String resourceId, String resourceType ){

        List<TagNameTypeInfo> tagNameTypeInfos = new ArrayList<>();

        try {

            tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType);

        catch (Exception e){

            System.out.println("【ERROR】" + resourceId + "&" + resourceType + "在数据库中查询不到.......");

        }

        return tagNameTypeInfos;

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/575512.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF自定义Panel:让拖拽变得更简单

在 WPF 应用程序中&#xff0c;拖放操作是实现用户交互的重要组成部分。通过拖放操作&#xff0c;用户可以轻松地将数据从一个位置移动到另一个位置&#xff0c;或者将控件从一个容器移动到另一个容器。然而&#xff0c;WPF 中默认的拖放操作可能并不是那么好用。为了解决这个问…

Vue中v-for多个Echarts图表组件只渲染一个要素问题排查

这个系列主要是用于记录我日常工作中遇到的一些Bug,既属于知识分享&#xff0c;也是对学习习惯的维持… 问题描述 今天&#xff0c;在开发一个WebGIS大屏项目时&#xff0c;我遇到了多个三维Echarts饼图图表渲染的问题&#xff0c;因为相似图表很多&#xff0c;我决定将Echart图…

Unity3d使用Jenkins自动化打包(Windows)(二)

文章目录 前言一、Unity工程准备二、Unity调取命令行实战一实战二实战三实战四实战五 总结 前言 自动化打包的价值在于让程序员更轻松地创建和管理构建工具链&#xff0c;提高编程效率&#xff0c;将繁杂的工作碎片化&#xff0c;变成人人&#xff08;游戏行业特指策划&#x…

CDH集群hive初始化元数据库失败

oracle数据库操作&#xff1a; 报错如下&#xff1a;命令 (Validate Hive Metastore schema (237)) 已失败 截图如下&#xff1a; 后台日志部分摘录&#xff1a; WARNING: Use “yarn jar” to launch YARN applications. SLF4J: Class path contains multiple SLF4J binding…

tdesign坑之EnhancedTable树形结构默认展开所有行

⚠️在官方实例中&#xff0c;树形结构的表格提供了2种方法控制展开全部节点&#xff1a; 一是通过配置属性tree.defaultExpandAll为true代表默认展开全部节点&#xff08;仅默认情况有效&#xff09;&#xff1b; 二是使用组件实例方法expandAll()可以自由控制树形结构的展开…

VESTA模拟计算XRD标准卡片

先上Crystallography Open Database网站下载标准CIF卡片&#xff08;以PbI2为例&#xff09; 1.直接进网站搜元素就行 2.点CIF直接下载 3.打开VESTA&#xff0c;导入刚刚下载的CIF 4.导入成功就是这样的 5.按照我这个操作来计算 6.点Calculation 7.已经计算出来了&#xff…

Jmeter参数化 —— 循环断言多方法

1、参数化接口测试数据 注意&#xff1a;csv文档参数化&#xff0c;里面有多少条数据&#xff0c;就要在线程组里循环多少次&#xff0c;不然就只执行一次 2、添加配置元件-计数器 关于计数器&#xff1a; ①Starting Value&#xff1a;给定计数器的初始值; ②递增&#xff1a…

计算机专业学习单片机有什么意义吗?

玩单片机跟玩计算机区别还是很大的, 单片机有众多的种类,每一种又可能有很多个系列.可以说单片机就是为了专款专用而生的.这样来达到产品成本的降低,这就是现在身边的很多的电子产品价格一降再降的原因之一.在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一…

Eclipse+Java+Swing实现斗地主游戏

一. 视频演示效果 java斗地主源码演示 ​ 二.项目结构 代码十分简洁&#xff0c;只有简单的7个类&#xff0c;实现了人机对战 素材为若干的gif图片 三.项目实现 启动类为Main类&#xff0c;继承之JFrame&#xff0c;JFrame 是 Java Swing 库中的一个类&#xff0c;用于创建窗…

腾讯云4核8G服务器多少钱?12M带宽646元15个月,买1年送3月

2024年腾讯云4核8G服务器租用优惠价格&#xff1a;轻量应用服务器4核8G12M带宽646元15个月&#xff0c;CVM云服务器S5实例优惠价格1437.24元买一年送3个月&#xff0c;腾讯云4核8G服务器活动页面 txybk.com/go/txy 活动链接打开如下图&#xff1a; 腾讯云4核8G服务器优惠价格 轻…

uniApp使用XR-Frame创建3D场景(7)加入点击交互

上篇文章讲述了如何将XR-Frame作为子组件集成到uniApp中使用 这篇我们讲解如何与场景中的模型交互&#xff08;点击识别&#xff09; 先看源码 <xr-scene render-system"alpha:true" bind:ready"handleReady"><xr-node><xr-mesh id"…

eclipse自动跳到console 解决办法

eclipse启动服务后&#xff0c;想看一些properties信息或者别的&#xff0c;但老是自动跳转到console页面&#xff0c;下面是解决办法&#xff1a; Eclipse中按照如下顺序找到设置菜单的位置&#xff1a; Window — Preferences — Run/Debug — Console 找到以下两项&#xf…