33、Flink之hive介绍与简单示例

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

33、Flink之hive介绍与简单示例

42、Flink 的table api与sql之Hive Catalog


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例: Apache Hive
    • 1、支持的Hive版本
    • 2、依赖项
      • 1)、使用 Flink 提供的 Hive jar
      • 2)、用户定义的依赖项
      • 3)、移动 planner jar 包
    • 3、Maven 依赖
    • 4、连接到Hive
    • 5、DDL&DML


本文介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。
本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需要jdk11)。
更多的内容详见后续关于hive的介绍。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

关于flink与hive集成的部分请参考:42、Flink 的table api与sql之Hive Catalog

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-2.3.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-2.3.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-2.3.0 及更高版时支持。

2、依赖项

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 或者,您可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的-C或-l选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖,请参考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar包。您可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包。如果您使用的 Hive 版本尚未在此处列出,则第二种方法会更适合。

注意:建议您优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足您的需求时,再考虑使用分开添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的/lib/ 目录中。
在这里插入图片描述

2)、用户定义的依赖项

您可以在下方找到不同Hive主版本所需要的依赖项。

  • Hive 2.3.4
/flink-1.17.1/lib// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jarsflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-2.3.4.jar// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1/lib// Flink's Hive connectorflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-3.1.0.jarlibfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar

3)、移动 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

3、Maven 依赖

如果您在构建自己的应用程序,则需要在 mvn 文件中添加以下依赖项。 您应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们。

<!-- Flink Dependency -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><!-- Hive Dependency -->
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><scope>provided</scope>
</dependency>

4、连接到Hive

通过 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog连接到现有的 Hive 集群。

以下是如何连接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");----------------------示例----------------------------
import java.util.List;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** @author alanchan**/
public class TestHiveCatalogDemo {/*** @param args* @throws DatabaseNotExistException * @throws CatalogException */public static void main(String[] args) throws CatalogException, DatabaseNotExistException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);String name = "alan_hive";// testhive 数据库名称String defaultDatabase = "testhive";String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog("alan_hive", hiveCatalog);// 使用注册的catalogtenv.useCatalog("alan_hive");List<String> tables = hiveCatalog.listTables(defaultDatabase); for (String table : tables) {System.out.println("Database:testhive  tables:" + table);}}}
  • sql
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'mydatabase','hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;------------------具体示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in setFlink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in setFlink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set
  • ymal
execution:...current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the sessioncurrent-database: testhivecatalogs:- name: alan_hivecatalog  type: hivehive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog 时所支持的参数。

在这里插入图片描述

5、DDL&DML

在 Flink 中执行 DDL 操作 Hive 的表、视图、分区、函数等元数据时,参考:33、Flink之hive
Flink 支持 DML 写入 Hive 表,请参考:33、Flink之hive
以上,介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/90766.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python文本终端GUI框架详解

今天笔者带大家&#xff0c;梳理几个常见的基于文本终端的 UI 框架&#xff0c;一睹为快&#xff01; Curses 首先出场的是 Curses。 Curses 是一个能提供基于文本终端窗口功能的动态库&#xff0c;它可以: 使用整个屏幕 创建和管理一个窗口 使用 8 种不同的彩色 为程序提供…

【Linux系列】使用虚拟机安装Linux系统

首发博客地址 首发博客地址[1] 系列文章地址[2] 下载虚拟机 请从官网下载&#xff1a; https://customerconnect.vmware.com/en/downloads/info/slug/desktop_end_user_computing/vmware_workstation_player/17_0 如需不限速下载&#xff0c;请关注【程序员朱永胜】并回复 1018…

Android学习之路(11) ActionBar与ToolBar的使用

自android5.0开始&#xff0c;AppCompatActivity代替ActionBarActivity&#xff0c;而且ToolBar也代替了ActionBar&#xff0c;下面就是ActionBar和ToolBar的使用 ActionBar 1、截图 2、使用 2.1、AppCompatActivity和其对应的Theme AppCompatActivity使用的是v7的ActionBa…

一文讲通物联网嵌入式

最近有很多同学问我&#xff0c;物联网近几年一直是科技的热点&#xff0c;嵌入式和物联网有什么关系呢&#xff1f;我在这里统一给大家讲解一下。 嵌入式是应用于物联网产品方向的一种嵌入式操作系统。类似于Android系统是谷歌开发的移动操作系统&#xff0c;嵌入式实际上也是…

lnmp架构-mysql1

1.MySQL数据库编译 make完之后是这样的 mysql 初始化 所有这种默认不在系统环境中的路径里 就这样加 这样就可以直接调用 不用输入路径调用 2.初始化 重置密码 3.mysql主从复制 配置master 配置slave 当master 端中还没有插入数据时 在server2 上配slave 此时master 还没进…

Python股票交易---均值回归

免责声明&#xff1a;本文提供的信息仅用于教育目的&#xff0c;不应被视为专业投资建议。在做出投资决策时进行自己的研究并谨慎行事非常重要。投资涉及风险&#xff0c;您做出的任何投资决定完全由您自己负责。 在本文中&#xff0c;您将了解什么是均值回归交易算法&#xff…

基于Java+SpringBoot+Mybaties-plus+Vue+ElementUI 高校汉服租赁网站的 设计与实现

一.项目介绍 高校汉服租赁网站分为普通用户以及管理员两类 普通用户&#xff1a; 注册、登录系统、查看汉服首页发帖公告信息、 交流论坛&#xff08;发帖、查看帖子、评论&#xff09;、 公告咨询&#xff08;查看公告以及评论&#xff09;、 汉服信息&#xff08;查…

(线特征)opencv+opencv contribute 配置

写一篇博客&#xff0c;记录开始线特征slam的历程。 在配置环境的时候&#xff0c;可以发现大多数都是用到了opencv3.4.16和其contribute版本&#xff0c;这里进行一个相关操作的教学。配置环境是在Ubuntu下面进行的&#xff0c;建议使用Ubuntu18来进行线特征的配置以及代码的…

高精度地图定位在高速公路自动驾驶系统中的应用

近年来随着汽车保有量不断增加&#xff0c;随之而来的是: ( 1) 严重的交通拥堵&#xff0c;通行效率低下&#xff0c;用在通行上的时间不断增加; ( 2) 交通事故频发&#xff0c;交通事故导致的伤亡人数和费用不断增加&#xff0c;而且绝大多数事故是由人为因素导致的; ( 3) 大气…

脱离束缚:数字化工厂中ARM控制器的革命性应用!

近年来&#xff0c;中国数字经济体系已进入高速增长阶段。制造业作为中国经济高质量发展的重要支撑力量&#xff0c;在面临生产成本不断上涨、关键装备和核心零部件“受制于人”等挑战时&#xff0c;建设数字化工厂已成必然。 数字化工厂数据采集出现的问题 在数字工厂的建设…

Unity3D软件安装包分享(附安装教程)

目录 一、软件简介 二、软件下载 一、软件简介 Unity3D是一款全球知名的游戏开发引擎&#xff0c;由Unity Technologies公司开发。它提供了一个跨平台、多功能的开发环境&#xff0c;支持创建2D和3D游戏、交互式应用、虚拟现实、增强现实等多种类型的应用程序。以下是Unity3D…

24.排序,插入排序,交换排序

目录 一. 插入排序 &#xff08;1&#xff09;直接插入排序 &#xff08;2&#xff09;折半插入排序 &#xff08;3&#xff09;希尔排序 二. 交换排序 &#xff08;1&#xff09;冒泡排序 &#xff08;2&#xff09;快速排序 排序&#xff1a;将一组杂乱无章的数据按一…