大数据-Storm流式框架(五)---DRPC

DRPC

概念

分布式RPC(DRPC)背后的想法是使用Storm在运行中并行计算真正强大的函数。 Storm拓扑接收函数参数流作为输入,并为每个函数调用发送结果的输出流。

DRPC并不是Storm的一个特征,因为它基于Storm的spouts,bolts和拓扑的高级抽象。DRPC本可以打包成Storm独立的库,但是跟storm绑定在一起很有用。

顶层视角

分布式RPC由“DRPC服务器”协调(Storm随附实现)。 DRPC服务器协调接收RPC请求,将请求发送到Storm拓扑,从Storm拓扑接收结果,并将结果发送回等待的客户端。 从客户端的角度来看,分布式RPC调用看起来就像常规的RPC调用。 例如,以下是客户端如何使用参数“http://twitter.com”计算“到达”函数的结果:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

分布式RPC工作流程:

客户端向DRPC服务器发送要执行的函数名称以及该函数的参数。实现该功能的拓扑使用DRPCSpout从DRPC服务器接收函数调用流。 每个函数调用都由DRPC服务器标记唯一ID。 然后拓扑计算结果,在拓扑结束时,一个名为ReturnResults的bolt连接到DRPC服务器,并为其提供函数调用id的结果。 然后,DRPC服务器使用id来匹配客户端正在等待的结果,取消阻塞等待的客户端,并将结果发送给它。

LinearDRPCTopologyBuilder

Storm附带了一个名为LinearDRPCTopologyBuilder的拓扑构建器,它可以自动执行几乎所有涉及DRPC的步骤。 这些包括:

     1、设置spout

     2、将结果返回给DRPC服务器

     3、为bolt提供功能,以便在tuple(元组)组上进行有限聚合

我们来看一个简单的例子。 这是DRPC拓扑的实现,它返回带有“!”的输入参数。附:

public static class ExclaimBolt extends BaseBasicBolt {public void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}
}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);// ...
}

正如你所看到的,没有几行代码。 创建LinearDRPCTopologyBuilder时,可以告诉它拓扑的DRPC函数的名称。 单个DRPC服务器可以协调许多功能,函数名称可以区分各个函数。 声明的第一个bolt将2元组作为输入,其中第一个字段是请求ID,第二个字段是该请求的参数。 LinearDRPCTopologyBuilder期望最后一个bolt发出一个输出流,其中包含[id,result]形式的2元组。 最后,所有中间元组都必须包含请求ID作为第一个字段。

在这个例子中,ExclaimBolt只是附加一个“!” 到元组的第二个字段。 LinearDRPCTopologyBuilder处理连接到DRPC服务器并返回结果的其余协调。

本地模式的DRPC

DRPC可以在本地模式运行。下面的例子说明了如何运行本地模式的DRPC:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));cluster.shutdown();
drpc.shutdown();

首先,创建一个LocalDRPC对象。 此对象模拟正在进行的DRPC服务器,就像LocalCluster在进程中模拟Storm集群一样。 然后创建LocalCluster以在本地模式下运行拓扑。 LinearDRPCTopologyBuilder具有用于创建本地拓扑和远程拓扑的单独方法。 在本地模式下,LocalDRPC对象不会绑定到任何端口,因此拓扑需要知道要与之通信的对象。 这就是createLocalTopology将LocalDRPC对象作为输入接收的原因。

启动拓扑后,您可以使用LocalDRPC上的execute方法执行DRPC调用。

远程模式的DRPC

在实际集群上使用DRPC也很简单。 有三个步骤:

     1、启动DRPC服务器

     2、配置DRPC服务器的位置

     3、将DRPC拓扑提交给Storm集群

启动DRPC服务器可以使用storm脚本完成,就像启动Nimbus或UI一样:

bin/storm drpc

接下来,您需要配置Storm群集以了解DRPC服务器的位置。 这就是DRPCSpout如何知道从何处读取函数调用。 这可以通过storm.yaml文件或拓扑配置来完成。 通过storm.yaml配置这个看起来像这样:

drpc.servers:- "drpc1.foo.com"- "drpc2.foo.com"

最后,像启动任何一个其他的拓扑一样,使用StormSubmitter启动DRPC拓扑。要在远程模式运行上述的示例,操作如下:

StormSubmitter
.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用于为storm集群创建合适的拓扑。

稍微复杂的示例

感叹号DRPC示例是用于说明DRPC概念的玩具示例。让我们看一个更复杂的例子,它真正需要Storm集群为计算DRPC函数提供的并行性。我们将看到的示例是在Twitter上计算URL的范围。

URL的范围是在Twitter上暴露给URL的唯一人数。要计算覆盖面,您需要:

    1、获取推文网址的所有人

    2、获得所有这些人的所有粉丝

    3、独特的追随者

    4、统计一组独特的粉丝

在计算过程中,单个到达计算可能涉及数千个数据库调用和数千万个跟随者记录。这是一个非常非常密集的计算。正如您将要看到的那样,在Storm之上实现此功能非常简单。在一台计算机上,达到计算可能需要几分钟;在Storm集群中,您可以在几秒钟内计算最难的URL的覆盖率。

此处的storm-starter中定义了样本范围拓扑。以下是定义范围拓扑的方法:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2).fieldsGrouping(new Fields("id"));

拓扑执行为四个步骤:

    1、GetTweeters获取推文URL的用户。它将[id,url]的输入流转换为[id,tweeter]的输出流。每个url元组将映射到许多tweeter元组。

    2、GetFollowers获得推特的追随者。它将[id,tweeter]的输入流转换为[id,follower]的输出流。在所有任务中,当有人跟随多个发布相同URL的人时,可能会有重复的跟随元组。

    3、PartialUniquer通过关注者ID对关注者流进行分组。这具有相同的跟随者执行相同任务的效果。因此,PartialUniquer的每项任务都将获得相互独立的追随者。一旦PartialUniquer收到针对请求ID的所有针对它的关注元组,它就会发出其关注者子集的唯一计数。

4、最后,CountAggregator接收来自每个PartialUniquer任务的部分计数,并将它们相加以完成到达计算。

PartialUniquer代码:

public class PartialUniquer extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;Set<String> _followers = new HashSet<String>();@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_followers.add(tuple.getString(1));}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _followers.size()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "partial-count"));}
}

PartialUniquer通过扩展BaseBatchBolt实现IBatchBolt。批处理bolt提供了一个第一类API来处理一批元组作为具体单元。为每个请求ID创建一个新的批处理bolt实例,Storm会在适当的时候负责清理实例。

当PartialUniquer在execute方法中收到一个跟随元组时,它会将它添加到内部HashSet中的请求ID的集合中。

批处理bolt提供finishBatch方法,该方法在处理了针对此任务的此批处理的所有元组之后调用。在回调中,PartialUniquer会发出一个元组,其中包含其跟随者id子集的唯一计数。

在底层,CoordinatedBolt用于检测给定的bolt何时收到任何给定请求ID的所有元组。 CoordinatedBolt利用直接流来管理这种协调。

拓扑的其余部分应该是不言自明的。如您所见,到达计算的每一步都是并行完成的,定义DRPC拓扑非常简单。

非线性DRPC拓扑

LinearDRPCTopologyBuilder仅处理“线性”DRPC拓扑,其中计算表示为一系列步骤(如覆盖范围)。 不难想象函数需要更复杂的拓扑结构,包括bolt的分支和合并。 现在,要做到这一点,你需要直接使用CoordinatedBolt。 请务必在邮件列表中讨论非线性DRPC拓扑的用例,以便为DRPC拓扑构建更一般的抽象。

LinearDRPCTopologyBuilder工作流程:

DRPCSpout发射[args, return-info]。return-info是DRPC服务器的主机名和端口号,以及DRPC服务器生成的id。

创建一个拓扑包括:

  1. DRPCSpout
  2. PrepareRequest(生成请求ID,为返回信息创建一个流,为参数创建一个流)
  3. CoordinatedBolt
  4. JoinResult(使用return info合并结果)
  5. ReturnResult(连接DRPC服务器以及返回结果)

LinearDRPCTopologyBuilder是在storm原语之上构建高级别抽象的一个很好的例子。

进阶

KeyedFairBolt用于编织多个同时请求的处理

如何直接使用CoordinatedBolt

DRPC (Distributed RPC)  remote procedure call

分布式远程过程调用

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。

DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。

(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

DRPC设计目的:

为了充分利用Storm的计算能力实现高密度的并行实时计算。

(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

定义DRPC拓扑

方法1:

通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)

该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现

方法2:

直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑

需要手动设定好开始的DRPCSpout以及结束的ReturnResults

运行模式:

1、本地模式

2、远程模式(集群模式)

修改配置文件conf/storm.yaml

drpc.servers:

    - "node1“

启动DRPC Server

bin/storm drpc &

通过StormSubmitter.submitTopology提交拓扑

案例:

Twitter 中某个URL的受众人数统计(这篇twitter到底有多少人看到过)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/154502.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

亿图导出word和PDF中清晰度保留方法

步骤一 在亿图软件中画一个元件大小搭配合理的图。注意字体大小的安排&#xff0c;尤其是角标的大小要合适&#xff0c;示范如下 选中所有元器件&#xff0c;右键使用组合功能将电路图组合为一个整体 步骤二&#xff1a; 将亿图软件中的图保存为SVG格式。示范如下 在导出到…

混合编程 ATPCS规范及案例(汇编调用C、C调用汇编、内联汇编)

1.混合编程的规范 2.汇编调用C 2.C调用汇编 3.内联汇编 例子&#xff1a;

0-1背包问题【穷举法+二维dp数组】

问题描述&#xff1a; 使用穷举法解决0/1背包问题。问题描述&#xff1a;给定n个重量为{w1, w2, … ,wn}、价值为{v1, v2, … ,vn} 的物品和一个容量为C的背包&#xff0c;求这些物品中的一个最有价值的子集&#xff0c;且要能够装到背包中。 穷举法&#xff1a;每件物品装还是…

Qt之基于QCustomPlot绘制直方图(Histogram),叠加正态分布曲线

一.效果 二.原理 1.正态分布 高斯分布(Gaussian distribution),又名正态分布(Normal distribution),也称"常态分布",也就是说,在正常的状态下,一般的事物,都会符合这样的分布规律。 比如人的身高为一个随机变量,特别高的人比较少,特别矮的也很少,大部分都…

Ubuntu系统HUSTOJ 用 vim 修改php.ini 重启PHP服务

cd / sudo find -name php.ini 输出&#xff1a; ./etc/php/7.4/cli/php.ini ./etc/php/7.4/fpm/php.ini sudo vim /etc/php/7.4/cli/php.ini sudo vim /etc/php/7.4/fpm/php.ini 知识准备&#xff1a; vim的搜索与替换 在正常模式下键入 / &#xff0c;即可进入搜索模式…

博客系统的前后端实现

前面的学习中, 我们基于 HTML, CSS, JavaScript 实现了一个简单的博客系统的页面. 接下来我们基于博客系统页面来实现一个带服务器版本的博客程序. 1.准备工作 1.创建项目 2.引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns…

嵌入式基础知识-RSA非对称加密基本原理

之前的文章嵌入式基础知识-信息安全与加密&#xff0c;介绍过数据加密的一些基本概念&#xff0c;对称加密的原理比较简单&#xff0c;加密和解密的密钥相同&#xff0c;而非对称加密&#xff0c;两个密钥不同&#xff0c;本篇就来具体介绍RSA这种非对称加密的密钥计算原理。 …

Goland连接服务器/虚拟机远程编译开发

创建SSH连接 SSH用于与远程服务器建立连接 Settings -> Tools -> SSH Configurations 添加新的ssh连接&#xff0c;Host为ip地址&#xff0c;Username为用户名&#xff0c;认证方式这里选择密码验证 全部填完后可以点击Test Connection测试连接是否成功 创建Deployment…

Android14 WMS启动流程

一 概述 本文Android14源代码可参考&#xff1a;Search 在 Android 系统中&#xff0c;从设计的角度来看&#xff0c;窗口管理系统是基于 C/S 模式的。整个窗口系统分为服务端和客户端两大部分&#xff0c;客户端负责请求创建窗口和使用窗口&#xff0c;服务端完成窗口的维护…

CentOS 编译安装 nginx

CentOS 编译安装 nginx 修改 yum 源地址为 阿里云 curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repoyum makecache升级内核和软件 yum -y update安装常用软件和依赖 yum -y install gcc gcc-c make cmake zlib zlib-devel openss…

python使用ffmpeg来制作音频格式转换工具(优化版)

简介:一个使用python加上ffmpeg模块来进行音频格式转换的工具。 日志: 20231030:第一版,设置了简单的UI布局和配色,实现音频转为Mp3、AAC、wav、flac四种格式。可解析音频并显示信息,可设置转换后的保存路径 UI界面: 编程平台:visual studio code 编程语言:python 3…

CSS+Javascript+Html日历控件

最近&#xff0c;因需要用HTMLJAVASCRIPTCSS实现了一个日历控件&#xff0c;效果如下&#xff1a; 单击上月、下月进行日历切换。当前日期在日历中变颜色标注显示。还是老老套路、老方法&#xff0c;分HMLCSSJAVASCRIPT三部分代码。 一、html代码 <h1>学习计划</h1…