14_基于Flink将pulsar数据写入到HBase

3.7.基于Flink将数据写入到HBase

3.7.1.编写Flink完成数据写入到Hbase操作, 完成数据备份, 便于后续进行即席查询和离线分析

3.7.1.1.HBase基本介绍

hbase是基于Google发布bigTable论文产生一款软件, 是一款noSQL型数据, 不支持SQL. 不支持join的操作, 没有表关系, 不支持事务(多行事务),hbase是基于 HDFS的采用java 语言编写

查询hbase数据一般有三种方案(主键(row key)查询, 主键的范围检索,查询全部数据)

都是以字节类型存储,存储结构化和半结构化数据。

hbase表的特点: 大 面向列的存储方案 稀疏性

2.7.1.2.应用场景

1)需要进行随机读写的操作。
2)数据量比较大。
3)数据比较稀疏。

2.7.1.3.HBase安装操作

本次安装的HBase为2.2.7,详细的安装手册大家可以参考资料, 还需要大家注意,HBase的启动需要依赖于zookeeper
和HDFS的, 顾需要先安装 HADOOP与zookeeper
在这里插入图片描述

  • 1-在Hbase中创建目标表
create 'itcast_h_ems, {NAME=>'f1',COMPRESSION=>'GZ'},{NUMREGIONS=>6, SPLITALGO=>'HexStringSplit'}
  • 2- 编写Flink代码完成写入Hbase操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.Properties;// 基于Flink消费Pulsar数据, 然后将数据灌入到HBase中, 完成数据备份, 以及后续即席查询和离线分析
public class ItcastFlinkToHBase {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2. 添加Source组件, 从Pulsar中读取消息数据Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2 转换为Flink TableSchema schema = Schema.newBuilder().column("id", DataTypes.INT()).column("sid", DataTypes.STRING()).column("ip", DataTypes.STRING()).column("session_id", DataTypes.STRING()).column("create_time", DataTypes.STRING()).column("yearInfo", DataTypes.STRING()).column("monthInfo", DataTypes.STRING()).column("dayInfo", DataTypes.STRING()).column("hourInfo", DataTypes.STRING()).column("seo_source", DataTypes.STRING()).column("area", DataTypes.STRING()).column("origin_channel", DataTypes.STRING()).column("msg_count", DataTypes.INT()).column("from_url", DataTypes.STRING()).build();tableEnv.createTemporaryView("itcast_ems",dataStreamSource,schema);//2.3: 定义HBase的目标表String hTable = "create table itcast_h_ems("+"rowkey int,"+"f1 ROW<sid STRING,ip STRING,session_id STRING,create_time STRING,yearInfo STRING,monthInfo STRING,dayInfo STRING,hourInfo STRING,seo_source STRING,area STRING,origin_channel STRING,msg_count INT,from_url STRING>,"+"primary key(rowkey) NOT ENFORCED" +") WITH ("+"'connector'='hbase-2.2',"+"'table-name'='itcast_h_ems',"+"'zookeeper.quorum'='node1:2181,node2:2181,node3:2181'"+")";//4. 执行操作tableEnv.executeSql(hTable);tableEnv.executeSql("insert into itcast_h_ems select id,ROW(sid,ip,session_id,create_time,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) from itcast_ems");}}

PulsarTopicPojo

public class PulsarTopicPojo {private Integer id;private String sid;private String ip;private String session_id;private String create_time;private String yearInfo;private String monthInfo;private String dayInfo;private String hourInfo;private String seo_source;private String area;private String origin_channel;private Integer msg_count;private  String from_url;public PulsarTopicPojo() {}public PulsarTopicPojo(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {this.id = id;this.sid = sid;this.ip = ip;this.session_id = session_id;this.create_time = create_time;this.yearInfo = yearInfo;this.monthInfo = monthInfo;this.dayInfo = dayInfo;this.hourInfo = hourInfo;this.seo_source = seo_source;this.area = area;this.origin_channel = origin_channel;this.msg_count = msg_count;this.from_url = from_url;}public void setData(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {this.id = id;this.sid = sid;this.ip = ip;this.session_id = session_id;this.create_time = create_time;this.yearInfo = yearInfo;this.monthInfo = monthInfo;this.dayInfo = dayInfo;this.hourInfo = hourInfo;this.seo_source = seo_source;this.area = area;this.origin_channel = origin_channel;this.msg_count = msg_count;this.from_url = from_url;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getSid() {return sid;}public void setSid(String sid) {this.sid = sid;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getSession_id() {return session_id;}public void setSession_id(String session_id) {this.session_id = session_id;}public String getCreate_time() {return create_time;}public void setCreate_time(String create_time) {this.create_time = create_time;}public String getYearInfo() {return yearInfo;}public void setYearInfo(String yearInfo) {this.yearInfo = yearInfo;}public String getMonthInfo() {return monthInfo;}public void setMonthInfo(String monthInfo) {this.monthInfo = monthInfo;}public String getDayInfo() {return dayInfo;}public void setDayInfo(String dayInfo) {this.dayInfo = dayInfo;}public String getHourInfo() {return hourInfo;}public void setHourInfo(String hourInfo) {this.hourInfo = hourInfo;}public String getSeo_source() {return seo_source;}public void setSeo_source(String seo_source) {this.seo_source = seo_source;}public String getArea() {return area;}public void setArea(String area) {this.area = area;}public String getOrigin_channel() {return origin_channel;}public void setOrigin_channel(String origin_channel) {this.origin_channel = origin_channel;}public Integer getMsg_count() {return msg_count;}public void setMsg_count(Integer msg_count) {this.msg_count = msg_count;}public String getFrom_url() {return from_url;}public void setFrom_url(String from_url) {this.from_url = from_url;}@Overridepublic String toString() {return "PulsarTopicPojo{" +"id=" + id +", sid='" + sid + '\'' +", ip='" + ip + '\'' +", session_id='" + session_id + '\'' +", create_time='" + create_time + '\'' +", yearInfo='" + yearInfo + '\'' +", monthInfo='" + monthInfo + '\'' +", dayInfo='" + dayInfo + '\'' +", hourInfo='" + hourInfo + '\'' +", seo_source='" + seo_source + '\'' +", area='" + area + '\'' +", origin_channel='" + origin_channel + '\'' +", msg_count=" + msg_count +", from_url='" + from_url + '\'' +'}';}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/58866.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nacos基本应用

Nacos 基本应用 Nacos 提供了 SDK 和 OpenAPI 方式来完成服务注册与发现等操作&#xff0c;SDK 实际上是对于 http 请求的封装。 微服务架构的电子商务平台&#xff0c;其中包含订单服务、商品服务和用户服务。可以使用 Nacos 作为服务注册和发现的中心&#xff0c;以便各个微…

链式二叉树统计结点个数的方法和bug

方法一&#xff1a; 分治&#xff1a;分而治之 int BTreeSize1(BTNode* root) {if (root NULL) return 0;else return BTreeSize(root->left)BTreeSize(root->right)1; } 方法二&#xff1a; 遍历计数&#xff1a;设置一个计数器&#xff0c;对二叉树正常访问&#…

protobuf 2定义string常量

背景 protobuf 2中定义的enum枚举值必须为数字类型&#xff0c;故不支持string类型&#xff0c;但有些业务场景又确实需要定义string常量。 目标 在protobuf 2中定义string常量。 方案 思路&#xff1a;通optional default实现string常量。 细节&#xff1a; 1、protobu…

论文浅尝 | 面向多步推理任务专业化较小语言模型

笔记整理&#xff1a;张沈昱&#xff0c;东南大学硕士&#xff0c;研究方向为自然语言处理 链接&#xff1a;https://github.com/FranxYao/FlanT5-CoT-Specialization 动机 本文的动机是探索如何在多步推理任务中通过大型语言模型提升较小的语言模型的性能。作者认为&#xff0…

AVS3:跨多通道预测PMC

前面的文章中介绍了TSCPM&#xff0c;它是AVS3中用于intra模式的跨通道预测技术&#xff0c;它利用线性模型根据亮度重建像素预测色度像素&#xff0c; 跨通道预测技术用于去除不同通道间的冗余信息&#xff0c;TSCPM可以去除Y-Cb、Y-Cr通道间的冗余&#xff0c;然而却忽略了…

什么是训练数据?

算法从数据中学习。算法从得到的训练数据中找到关系&#xff0c;形成理解&#xff0c;做出决策&#xff0c;并评估信心。训练数据越好&#xff0c;模型的表现就越好。 实际上&#xff0c;与算法本身一样&#xff0c;训练数据的质量和数量与数据项目的成功有很大关系。 现在&…

LeetCode150道面试经典题-删除有序数组中的重复项(简单)

1.题目 给你一个 升序排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c;返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。然后返回 nums 中唯一元素的个数。 考虑 nums 的唯一元素的数量为 k &#xff0c…

RocketMQ 主备自动切换模式部署

目录 主备自动切换模式部署 Controller 部署​ Controller 嵌入 NameServer 部署​ Controller 独立部署​ Broker 部署​ 兼容性​ 升级注意事项​ 主备自动切换模式部署 该文档主要介绍如何部署支持自动主从切换的 RocketMQ 集群&#xff0c;其架构如上图所示&#xff…

libmpv使用滤镜处理视频进行播放

一、前言 作为一个功能强大的多媒体框架,libmpv为开发者提供了广泛的功能和灵活的控制权。滤镜是libmpv的一个重要特性,允许开发者对视频进行各种实时处理和增强,从而满足用户对于个性化、创意化和高质量视频体验的需求。 滤镜是一种在视频渲染过程中应用特定效果的技术。…

【计算机网络】TCP协议超详细讲解

文章目录 1. TCP简介2. TCP和UDP的区别3. TCP的报文格式4. 确认应答机制5. 超时重传6. 三次握手7. 为什么两次握手不行?8. 四次挥手9. 滑动窗口10. 流量控制11. 拥塞控制12. 延时应答13. 捎带应答14. 面向字节流15. TCP的连接异常处理 1. TCP简介 TCP协议广泛应用于可靠性要求…

记录一次electron打包提示文件找不到的解决方法

没有配置files选项 files的作用是配置打包到应用程序的构建资源 就是说如果你想使用项目那个目录下的文件 就得通过files配置一下不然就会报错 json文件或者yml文件会报的错 格式是这样的 "files": ["dist-electron", "dist"],electron打包配…

解析隧道代理被封的几个主要原因

Hey&#xff0c;各位爬虫高手&#xff0c;你是不是经常遇到爬虫代理HTTP被封的问题&#xff1f;不要慌&#xff0c;今天我来分享一些信息&#xff0c;帮你解析这个问题&#xff01;告别封禁&#xff0c;让你的爬虫工作更顺利&#xff0c;赶快跟随我一起了解吧&#xff01; 在爬…