flink: 将接收到的tcp文本流写入HBase

一、依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>2.8.0</pulsar.version><jackson.version>2.10.5</jackson.version><!--<jackson.version>2.6.7</jackson.version>--></properties><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-kafka</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>${pulsar.version}</version><exclusions><exclusion><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId></exclusion></exclusions></dependency><!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency>--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-1.4_2.12</artifactId><version>1.13.6</version></dependency></dependencies></project>

二、HBase中建表:

create 'hbasetable','family1','family2','family3','family4'

三、在一台服务器上开启nc

nc -lk 9999

四、运行,demo程序

package cn.edu.tju;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import java.util.UUID;public class FlinkHBase3 {
//nc 服务器地址private static String HOST_NAME = "xx.xx.xx.xx";private static int PORT = 9999;private static String DELIMITER ="\n";public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> socketDataInfo =  env.socketTextStream(HOST_NAME, PORT, DELIMITER);SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {@Overridepublic DataInfo map(String value) throws Exception {String[] stringList = value.split(",");DataInfo dataInfo = new DataInfo(UUID.randomUUID().toString(), Long.parseLong(stringList[0]), stringList[1], Double.parseDouble(stringList[2]));return dataInfo;}});Table dataTable = tableEnv.fromDataStream(dataInfoStream,"rowkey,ts,info,val");tableEnv.createTemporaryView("dataTable", dataTable);// 这里要配自己HBase的zookeeper地址tableEnv.executeSql("CREATE TABLE flinkTable (\n" +" rowkey STRING,\n" +" family1 ROW<ts BIGINT, info STRING, val DOUBLE>,\n" +" PRIMARY KEY (rowkey) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'hbase-1.4',\n" +" 'table-name' = 'hbasetable',\n" +" 'zookeeper.quorum' = 'xx.xx.xx.xx:2181'\n" +")");tableEnv.executeSql("INSERT INTO flinkTable " +"SELECT rowkey, ROW(ts,info,val) FROM dataTable");env.execute("HBaseFlinkJob");}public static class DataInfo{private String rowkey;private Long ts;private String info;private double val;public String getRowkey() {return rowkey;}public void setRowkey(String rowkey) {this.rowkey = rowkey;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public String getInfo() {return info;}public void setInfo(String info) {this.info = info;}public double getVal() {return val;}public void setVal(double val) {this.val = val;}@Overridepublic String toString() {return "DataInfo{" +"ts=" + ts +", info='" + info + '\'' +", val='" + val + '\'' +'}';}public DataInfo( String rowkey, Long ts, String info, double val) {this.rowkey = rowkey;this.ts = ts;this.info = info;this.val = val;}public DataInfo() {}}}

五、在nc窗口输入:

1689999832,dong,32.45

六、在HBase检查数据是否已经写入:

scan 'hbasetable'

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/577684.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AcWing刷题-空调

空调 差分&#xff1a; N int(input()) p list(map(int, input().split())) t list(map(int, input().split())) d,s[0]*100010,[0]*100010 for i in range(N):d[i] p[i]-t[i]for i in range(N):s[i] d[i]s[i1] - d[i] ans 0 for i in range(N1):if s[i]>0:ans s[i]…

常见的Nginx+Redis+MQ+DB架构设计

三高&#xff0c;复杂的架构 SQRS CAP 缓存&#xff0c;限流 【Redis&#xff0c;缓存】 cache-aside 缓存cache&#xff1a;数据源的副本 store 1. Read/Write Through Pattern 读写穿透模式 redis&#xff1a;放当前在线用户&#xff0c;热点数据

三个对象组练习.java

题目&#xff1a;定义数组存储3部汽车对象&#xff1b;汽车属性&#xff1a;品牌&#xff0c;价格&#xff0c;颜色&#xff1b;创造3个汽车对象&#xff0c;数据通过键盘录入而来&#xff0c;并把数据存储到数组当中 分析&#xff1a; 在main&#xff08;&#xff09;里面定义…

java分割回文串(力扣Leetcode131)

分割回文串 力扣原题链接 问题描述 给定一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是回文串。返回 s 所有可能的分割方案。 示例 示例 1: 输入&#xff1a;s “aab” 输出&#xff1a;[[“a”,“a”,“b”],[“aa”,“b”]] 示例 2: 输…

简易挛生分拣系统设计

1 工效组合展示 2 方案规划设计 3 数字挛生建模 基础建模、动画设计、模型导出 4 软件体系架构 5 Web交互设计 5.1 页面架构 5.2 初始构造 5.3 模型运用 5.4 WS通信 5.5 运行展现 6 服务支撑编码 6.1 整体调度 6.2 WS服务 6.3 C/S通信 7 系统级调试完善

Chrome 插件各模块使用 Fetch 进行接口请求

Chrome 插件各模块使用 Fetch 进行接口请求 常规网页可以使用 fetch() 或 XMLHttpRequest API 从远程服务器发送和接收数据&#xff0c;但受到同源政策的限制。 内容脚本会代表已注入内容脚本的网页源发起请求&#xff0c;因此内容脚本也受同源政策的约束&#xff0c;插件的来…

鸿蒙开发第一节

一.开发准备-工具安装 1.鸿蒙开发官网&#xff1a;华为开发者联盟-智能终端能力开放,共建开发者生态 (huawei.com) 2.DevEco Studio3.1下载链接HUAWEI DevEco Studio和SDK下载和升级 | 华为开发者联盟 点击下载按钮进行下载2.1解压文件2.2双击运行此程序 2.3安装软件 点击N…

day22.二叉树part08

day22.二叉树part08 235.二叉搜索树的最近公共祖先 原题链接 代码随想录链接 思路&#xff1a;因为本题是二叉搜索树&#xff0c;利用它的特性可以从上往下进行递归遍历树&#xff0c;这里需要理解一点就是如果遍历到的一个节点发现该节点的值正好位于节点p和节点q的值中间…

记录C++中,vector的迭代器在push_back以后扩容导致迭代器失效的问题

前言 vector是我们用到最多的数据结构&#xff0c;其底层数据结构是单端动态数组&#xff0c;由于数组的特点&#xff0c;vector也具有以下特性&#xff1a; ①O(1)时间的快速访问&#xff1b; ②顺序存储&#xff0c;所以插入到非尾结点位置所需时间复杂度为O(n)&#xff0c;删…

ARMv8-A架构下的外部debug模型(external debug)简介

Armv8-A external debug Armv8-A debug模型一&#xff0c;外部调试 External debug 简介二&#xff0c;Debug state2.1 Debug state的进入与退出 三&#xff0c;DAP&#xff0c;Debug Access Port3.1 EDSCR, External Debug Status and Control Register调试状态标识&#xff0…

小型分布式文件存储系统GoFastDfs应用简介

前言 最近稍微留意了一下各个文件存储系统的协议&#xff0c;发现minio是LGPLV3, 而fastdfs 是GPL3,这些协议其实对于商业应用是一个大坑。故而寻找一些代替品。 go-fastdfs就是其中之一&#xff0c;官网在&#xff1a; go-fastdfs 具体应用 其实可以直接查看官网教程的。 下…

代码随想录学习Day 21

回溯算法理论基础 回溯法又叫回溯搜索法。回溯是递归的副产品&#xff0c;有递归就会有回溯&#xff0c;回溯操作一般出现在递归函数的下面。回溯函数 递归函数。回溯法的本质是穷举。 回溯法解决的问题&#xff1a; 组合问题&#xff1a;N个数里面按一定规则找出k个数的集…