【大数据】NiFi 中的 Controller Service

NiFi 中的 Controller Service

  • 1.Service 简介
    • 1.1 Controller Service 的配置
      • 1.1.1 SETTING 基础属性
      • 1.1.2 PROPERTIES 使用属性
      • 1.1.3 COMMENT 页签
    • 1.2 Service 的使用范围
  • 2.全局参数配置
  • 3.DBCPConnectionPool 的使用样例
  • 4.在 ExcuseGroovyScript 组件中使用 Service

1.Service 简介

首先 NiFi 中的 Controller Service 和我们 MVC 概念中的 Controller Service 不是一个概念,NiFi 中的 Controller Service 更像是和 Processor 同级的一个概念,它和 Processor 在我个人的使用经验来理解的话就是 它是预制好的各种服务,可以被 Processor 引用或者支撑 Processor,例如一个 SQL 读取的 Processor,它得需要 JDBC 的连接,才能访问数据库。这里 Controller Service 就可以是一个 JDBC 的连接池服务。

同理,Controller Service 也是支持扩展的,可以像自定义开发 Processor 一样,根据自己的业务需求,进行自定义的 Controller Service 开发。

当我们使用某些依赖 Service 的组件(Processor)时,在配置中会出现选择 Service 或者创建新的 Service 的情况,这里的 Service 即是 NiFi 的 Controller Service,一旦创建新的,则会生成一个以 Group 为范围的 “全局” Service 对象,这时,再有依赖同类型 Service 的 Processor 时,可以直接选中。

在这里插入图片描述
在这里插入图片描述

1.1 Controller Service 的配置

单独查看 Controller Service 可以从面板空白处,右键 Configure 来看,如下图:

在这里插入图片描述
这是一个 JDBC 的连接池 Service,它包含的属性有 名称类型简介启用状态操作;从操作中可以看到配置该 Service 需要填写基本的各类属性;其中,Service 是有启停状态的,如果想修改 Service 的属性内容,必须先保证该 Service 是停用状态,然后点击配置标识,则进入配置页面,它的配置和 Processor 的差不多,通过页签区别,共有三个页签:SETTING基础属性)、PROPERTIES使用属性)、COMMENT页签)。

1.1.1 SETTING 基础属性

基础属性,包含左侧的名称,名称可以进行更改,右侧包含引用此 Service 的 Processor 列表。

1.1.2 PROPERTIES 使用属性

在这里插入图片描述
核心的业务配置,此标签页的配置项根据不同的 Service,配置内容不一致,具体的配置项以及使用,可以参考官方的文档;这里的是 JDBC 的连接池,所以基本需要连接数据库所需的 URL、数据库的账号密码、数据库的驱动类名称、驱动类的依赖 jar 包路径 ,这里不少 Service 可能都需要第三方的 jar 包依赖才可以使用,长期使用或生产环境下,建议将所有 jar 资源集中放在统一路径下。

1.1.3 COMMENT 页签

在这里插入图片描述
一个提供 Service 使用说明的页签,可根据自己实际需求,补充使用 Service 的用法以及描述。

1.2 Service 的使用范围

在 NiFi 中,Group 同时也对 Service 起作用,如果我们在一个 NiFi 的最外层的平面上新增 Controller Service,那么这些 Service 的作用域是整个 NiFi 的任何位置,如果我们在某个 Group 内创建 Controller Service, 那么这个 Controller Service 仅在 Group 范围内可以被引用,NiFi 的这种机制也是方便 Service 的使用和维护。

在这里插入图片描述

2.全局参数配置

类似于数据库连接池、Kafka、Redis 等各种组件的连接池、客户端 Client 的 Service 在实际的使用中会非常多,由此配置的 Service 也会非常多,于是就会产生很多次的反复配置 URL、账号这一系列重复的内容,由于 NiFi 的特性,这些 Service 又和组件(Processor)一样,四散在各处,这就使得维护和运维管理变得很繁琐,调试、调整、查看的时候,要不停的各个 Group 来回跳转、调整不同的 Service 的 Configure;为应对此类问题,NiFi 提供了全局配置的机制来弥补。

使用变量前:

在这里插入图片描述
这里的 URLDriver Class NameDatabase User 在实际生产环境中,可能都是固定的数据库和固定的服务,几乎不需要变的,可能只需要配置一遍就好,不需要每次创建 Service 都写一遍;所以可以这里可以使用上下文变量(Parameter Context)。

首先,打开 Parameter Context,创新一组新的变量:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
再进入 CONTROLLER SERVICES 对 Service 的配置进行修改,将具体的 URLDriver Class NameDatabase User 等参数,全部使用变量替换(变量使用 # 符 )。

在这里插入图片描述

3.DBCPConnectionPool 的使用样例

下面将使用 NiFi 实现一个简单的 Demo,从 MySQL 数据库中读取部分数据,将数据进行筛选,然后将数据输出;

首先,使用 ExecuteSQL 组件,读取 MySQL 中的数据,根据上文描述,创建一个 DBCPConnectionPool 的 Service,然后启动:

在这里插入图片描述
添加 ExecuteSQL 组件,配置相关内容,根据自定义编写的 SQL 读取数据库内容:

在这里插入图片描述
随后添加 ConvertAvroToJSON 组件,这里从数据库读出的数据是不可读的,为了方便查看调试、同时也是为了后续使用 Groovy 处理数据,所以选择转换为 JSON 进行处理,实际使用可以根据自身情况选择转换器:

在这里插入图片描述
添加 ExecuteGroovyScript 组件,使用 Groovy 脚本对数据进行处理。

在这里插入图片描述
Groovy 的脚本内容如下:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;def dataJson = getInputJSONData()
if(null == dataJson){return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){def tem = dataJson.get(i);//在这里可以对数据进行处理rss.add(tem.name);
}// 输出
if(rss.size()>0){sendData(rss,REL_SUCCESS);
}/*** 读取输入流* @author GCC***/
def getInputJSONData(){def flow = session.get()if(null == flow){log.error("the flow is null ...");return;}def dataJson = null;def jsonStr = "";session.read(flow,{inputStream ->jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback);try{dataJson = new JsonSlurper().parseText(jsonStr);}catch(Exception e){log.error("输入流格式错误")}session.remove(flow);return dataJson;
}/***输出数据至后续管道*@param result 输出的数据*@param outStream 输出的管道*@author GCC***/
void sendData(def result,def outStream){String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());def newflow = session.create();newflow = session.write(newflow, {outputStream ->outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)session.transfer(newflow, outStream);
}

最后使用 LogMessage 组件作为接收数据,实际情况可以将数据转为下一处理节点或存储等等。

在这里插入图片描述
在这里插入图片描述

4.在 ExcuseGroovyScript 组件中使用 Service

ExcuseGroovyScript 组件内部使用 Groovy 脚本处理数据时,可能需要再次读取数据库或者使用其他第三方数据来辅助处理,这时候,ExcuteGroovyScript 组件支持可以引入 Service,提供用户编写的 Groovy 脚本内部使用 Service;

首先需要在 ExcuteGroovyScript 组件的 PROPERTIES 配置中新增属性:

在这里插入图片描述
这里,添加属性时,会让用户输入用户给该属性的命名,如果是普通命名,这里的属性仅仅作为静态数据而已,但是如果使用关键字 SQL. 或者 CTL. 作为名称前缀时,则能够使用 Service,后续的属性值则会变成 Service 的选择。

在 Groovy 的代码中,则可以通过 SQL.mysql.{method} 的方式,调用 Service 的方法:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;def dataJson = getInputJSONData()
if(null == dataJson){return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){def tem = dataJson.get(i);def mapdic = [:]// 使用 Service 查询数据库SQL.mysql.eachRow("SELECT id, value FROM tb_dic_detail WHERE u_status = 1 "){row->mapdic.put(row.id.toString(),row.value.toString());    }rss.add(tem.name);
}// 输出
if(rss.size()>0){sendData(rss,REL_SUCCESS);
}/***************************************************公共函数***************************************************//*** 读取输入流* @author GCC***/
def getInputJSONData(){def flow = session.get()if(null == flow){log.error("the flow is null ...");return;}def dataJson = null;def jsonStr = "";session.read(flow,{inputStream ->jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback);try{dataJson = new JsonSlurper().parseText(jsonStr);}catch(Exception e){log.error("输入流格式错误")}session.remove(flow);return dataJson;
}/***输出数据至后续管道*@param result 输出的数据*@param outStream 输出的管道*@author GCC***/
void sendData(def result,def outStream){String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());def newflow = session.create();newflow = session.write(newflow, {outputStream ->outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)session.transfer(newflow, outStream);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/294124.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为---登录USG6000V防火墙---console、web、telnet、ssh方式登录

目录 一、环境搭建 二、第一次登录USG6000V防火墙&#xff0c;即通过console方式登录 三、用户配置 四、web登录USG6000V防火墙 1. 用web创建的用户通过web方式登录USG6000V防火墙 2. 命令行创建的用户通过web方式登录USG6000V防火墙 五、ssh方式登录USG6000V防火墙 1. 用…

在线时间戳是什么?

在线时间戳是基于国际标准结合PKI密码技术及数字签名技术&#xff0c;对电子数据产生的精确时间进行固证&#xff0c;为电子数据提供时间证明的一种在线服务。时间戳技术使用数字签名技术对包含原始文件信息、签名参数、签名时间等信息构成的对象进行数字签名。访问沃通CA官网了…

通过 Higress Wasm 插件 3 倍性能实现 Spring-cloud-gateway 功能

作者&#xff1a;韦鑫&#xff0c;Higress Committer&#xff0c;来自南京航空航天大学分布式系统实验室 导读&#xff1a;本文将和大家一同回顾 Spring Cloud Gateway 是如何满足 HTTP 请求/响应转换需求场景的&#xff0c;并为大家介绍在这种场景下使用 Higress 云原生网关的…

日志服务 SLS 深度解析:拥抱云原生和 AI,基于 SLS 的可观测分析创新

云布道师 10 月 31 日&#xff0c;杭州云栖大会上&#xff0c;日志服务 SLS 研发负责人简志和产品经理孟威等人发表了《日志服务 SLS 深度解析&#xff1a;拥抱云原生和 AI&#xff0c;基于 SLS 的可观测分析创新》的主题演讲&#xff0c;对阿里云日志服务 SLS 产品服务创新以…

Qt的简单游戏实现提供完整代码

文章目录 1 项目简介2 项目基本配置2.1 创建项目2.2 添加资源 3 主场景3.1 设置游戏主场景配置3.2 设置背景图片3.3 创建开始按钮3.4 开始按钮跳跃特效实现3.5 创建选择关卡场景3.6 点击开始按钮进入选择关卡场景 4 选择关卡场景4.1场景基本设置4.2 背景设置4.3 创建返回按钮4.…

redis主从复制(在虚拟机centos的docker下)

1.安装docker Docker安装(CentOS)简单使用-CSDN博客 2.编辑3个redis配置 cd /etc mkdir redis-ms cd redis-ms/ vim redis6379.conf vim redis6380.conf vim redis6381.conf# master #端口号 port 6379#设置客户端连接后进行任何其他指定前需要使用的密码 requirepass 12345…

从0到1部署gitlab自动打包部署项目

本文重点在于配置ci/cd打包 使用的是docker desktop 第一步安装docker desktop Docker简介 Docker 就像一个盒子&#xff0c;里面可以装很多物件&#xff0c;如果需要某些物件&#xff0c;可以直接将该盒子拿走&#xff0c;而不需要从该盒子中一件一件的取。Docker中文社区、…

智能优化算法应用:基于人工大猩猩部队算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于人工大猩猩部队算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于人工大猩猩部队算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.人工大猩猩部队算法4.实验参…

2023.12.21 关于 Redis 常用数据结构 和 单线程模型

目录 各数据结构具体编码方式 查看 key 对应 value 的编码方式 Reids 单线程模型 经典面试题 IO 多路复用 Redis 常用数据结构 Redis 中所有的 key 均为 String 类型&#xff0c;而不同的是 value 的数据类型却有很多种以下介绍 5 种 value 常见的数据类型 注意&#xff1…

深入了解 Android 中的应用程序签名

深入了解 Android 中的应用程序签名 一、应用程序签名介绍1.1 应用程序签名1.2 应用程序签名的意义1.3 应用程序签名的流程1.4 应用程序签名的方案1.5 签名的重要性和应用场景 二、AOSP 的应用签名2.1 AOSP的应用签名文件路径2.2 应用程序指定签名文件 三、Android Studio 的应…

GIT具体配置步骤详解

GIT配置具体步骤如下 SDK 使用 Repo 工具管理&#xff0c;拉取 SDK 需要配置安装 Repo 工具。 Repo is a tool built on top of Git. Repo helps manage many Git repositories, does the uploads to revision control systems, and automates parts of the development workf…

php学习01-Hello World

开发环境搭建 参考 如果没有搭建的请参考上面的文章进行搭建 新建index.php <?php echo Hello World; ?>访问 修改index.php代码 <?php phpinfo() //主要是用来打印php的一些配置信息方便后期排查配置是否正确以及插件是否开启 ?>