01Thingsboard中actor和RuleEngine的启动流程

news/2024/12/27 3:12:08/文章来源:https://www.cnblogs.com/yanghuanxi/p/18631112

一、基本关系

1、单体服务

2、微服务

3、创建流程

Reference

二、DefaultActorService

DefaultActorService是整个规则引擎的初始化入口。

DefaultActorService会初始化一个使用actor模型的规则引擎,共分为2步:

  • ①创建actorSystem;
  • ②处理应用初始化完成事件

1、创建actorSystem

1、创建appActor
@PostConstruct
public void initActorSystem() {log.info("Initializing actor system.");actorContext.setActorService(this);TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);system = new DefaultTbActorSystem(settings);system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));actorContext.setActorSystem(system);// 创建appActor,全局唯一appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));actorContext.setAppActor(appActor);TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));actorContext.setStatsActor(statsActor);log.info("Actor system initialized.");
}

DefaultTbActorSystem.createActor

private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {Dispatcher dispatcher = dispatchers.get(dispatcherId);if (dispatcher == null) {log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");}TbActorId actorId = creator.createActorId();TbActorMailbox actorMailbox = actors.get(actorId);if (actorMailbox != null) {log.debug("Actor with id [{}] is already registered!", actorId);} else {Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());actorCreationLock.lock();try {actorMailbox = actors.get(actorId);if (actorMailbox == null) {log.debug("Creating actor with id [{}]!", actorId);// appActor类型TbActor actor = creator.createActor();TbActorRef parentRef = null;if (parent != null) {parentRef = getActor(parent);if (parentRef == null) {throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");}}TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);actors.put(actorId, mailbox);// 最后会调用appActor.init方法mailbox.initActor();actorMailbox = mailbox;if (parent != null) {parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);}} else {log.debug("Actor with id [{}] is already registered!", actorId);}} finally {actorCreationLock.unlock();actorCreationLocks.remove(actorId);}}return actorMailbox;
}

appActor.init()

在初始化appActor的时候会启动一个定时任务,去定时清除掉过期的sessionInfo。后面在做扩展的时候需要注意,sessionInfo长时间不用可能会被清除。

public void init(TbActorCtx ctx) throws TbActorException {super.init(ctx);if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE)) {systemContext.schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(),systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());}
}
2、创建stasActor

用于统计状态,创建过程与appActor相同。

== 注:appActor和stasActor全局唯一 ==

2、处理应用初始化完成事件

在应用初始化结束之后会接收到spring发送的ApplicationReadyEvent事件,会向appActor中发送一个AppInitMsg消息。然后appActor会为每一个租户初始化一个tanentActor和ruleChain、ruleNode。

@AfterStartUp(order = AfterStartUp.ACTOR_SYSTEM)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {log.info("Received application ready event. Sending application init message to actor system");appActor.tellWithHighPriority(new AppInitMsg());
}

appActor.doProcess()

@Override
protected boolean doProcess(TbActorMsg msg) {log.info("app actor s msg s msg type {}", msg.getMsgType());if (!ruleChainsInitialized) {if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {// 初始化租户的actorinitTenantActors();ruleChainsInitialized = true;} else {if (!msg.getMsgType().isIgnoreOnStart()) {log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);}return true;}}// ...
}

appActor.initTenantActors()

private void initTenantActors() {log.info("Starting main system actor.");try {if (systemContext.isTenantComponentsInitEnabled()) {// 查询所有租户PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);for (Tenant tenant : tenantIterator) {log.debug("[{}] Creating tenant actor", tenant.getId());// 为每一个租户初始化一个actorgetOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> {log.debug("[{}] Tenant actor created.", tenant.getId());}, () -> {log.debug("[{}] Skipped actor creation", tenant.getId());});}}log.info("Main system actor started.");} catch (Exception e) {log.warn("Unknown failure", e);}
}

appActor.getOrCreateTenantActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。

传入的actorId是tanentId,ActorCreator是TanentActor.ActorCreator(会传入tenantId,后面初始化ruleChain时会用到)。

在DefaultTbActorSystem.createActor()方法中TbActorMailbox中的actor就是TenantActor.class类型,所以在mailbox.initActor();方法中会调用tenantActor.init()方法进行初始化。

tenantActor.init()

public void init(TbActorCtx ctx) throws TbActorException {super.init(ctx);log.debug("[{}] Starting tenant actor.", tenantId);try {Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);if (tenant == null) {cantFindTenant = true;log.info("[{}] Started tenant actor for missing tenant.", tenantId);} else {isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);if (isRuleEngine) {if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {try {if (getApiUsageState().isReExecEnabled()) {log.debug("[{}] Going to init rule chains", tenantId);// 初始化ruleChaininitRuleChains();} else {log.info("[{}] Skip init of the rule chains due to API limits", tenantId);}} catch (Exception e) {log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);cantFindTenant = true;}} else {log.info("Tenant {} is not managed by current service, skipping rule chains init", tenantId);}}log.debug("[{}] Tenant actor started.", tenantId);}} catch (Exception e) {log.warn("[{}] Unknown failure", tenantId, e);}
}

在tenantActor初始化时会去初始化该租户下的ruleChain。
RuleChainManagerActor.initRuleChains()

protected void initRuleChains() {log.debug("[{}] Initializing rule chains", tenantId);// 查询该租户下所有的ruleChainfor (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {RuleChainId ruleChainId = ruleChain.getId();log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());// 初始化每一个ruleChainTbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);visit(ruleChain, actorRef);log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());}ruleChainsInitialized = true;
}

RuleChainManagerActor.getOrCreateActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。

DefaultTbActorSystem.createActor()

private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {Dispatcher dispatcher = dispatchers.get(dispatcherId);if (dispatcher == null) {log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");}TbActorId actorId = creator.createActorId();TbActorMailbox actorMailbox = actors.get(actorId);if (actorMailbox != null) {log.debug("Actor with id [{}] is already registered!", actorId);} else {Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());actorCreationLock.lock();try {actorMailbox = actors.get(actorId);if (actorMailbox == null) {log.debug("Creating actor with id [{}]!", actorId);// RuleChainActor.class类型TbActor actor = creator.createActor();TbActorRef parentRef = null;if (parent != null) {parentRef = getActor(parent);if (parentRef == null) {throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");}}TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);actors.put(actorId, mailbox);// 会调用RuleChainActor.init()方法,但是RuleChainActor没有重写init方法,而RuleChainActor继承ComponentActor,所以会调用ComponentActor.init()方法mailbox.initActor();actorMailbox = mailbox;if (parent != null) {parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);}} else {log.debug("Actor with id [{}] is already registered!", actorId);}} finally {actorCreationLock.unlock();actorCreationLocks.remove(actorId);}}return actorMailbox;
}

ComponentActor.init()

@Override
public void init(TbActorCtx ctx) throws TbActorException {super.init(ctx);// createProcessor会返回一个RuleChainActorMessageProcessor类型this.processor = createProcessor(ctx);// 这个方法里面会去初始化ruleNodeinitProcessor(ctx);
}

RuleChainActor.createProcessor()

@Override
protected RuleChainActorMessageProcessor createProcessor(TbActorCtx ctx) {return new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext,ctx.getParentRef(), ctx);
}

ComponentActor.initProcessor()

protected void initProcessor(TbActorCtx ctx) throws TbActorException {try {log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());// 这个方法里面回去初始化ruleNodeprocessor.start(ctx);logLifecycleEvent(ComponentLifecycleEvent.STARTED);if (systemContext.isStatisticsEnabled()) {scheduleStatsPersistTick();}} catch (Exception e) {log.debug("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType(), e);logAndPersist("OnStart", e, true);logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);throw new TbActorException("Failed to init actor", e);}
}

RuleChainActorMessageProcessor.start()

@Override
public void start(TbActorCtx context) {if (!started) {RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {// 找到租户下的ruleChainList<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);log.debug("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());// Creating and starting the actors;for (RuleNode ruleNode : ruleNodeList) {log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);// 创建ruleNodeTbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);// 缓存nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));}initRoutes(ruleChain, ruleNodeList);started = true;}} else {onUpdate(context);}
}
  • createRuleNodeActor方法最终也会调用到DefaultTbActorSystem.createActor,而actor的类型是RuleNodeActor.class
  • 所以会调用ruleNodeActor.init方法,而ruleNodeActor继承自ComponentActor,所以也会调用到ComponentActor中的init方法,
  • 跟ruleChainActor一样,最后会调用到ruleNodeActor中的createProcessor方法,返回值是一个RuleNodeActorMessageProcessor类型,
  • ComponentActor.initProcessor中的start方法就会调用到RuleNodeActorMessageProcessor.start方法

RuleNodeActorMessageProcessor.start

@Override
public void start(TbActorCtx context) throws Exception {if (isMyNodePartition()) {log.debug("[{}][{}] Starting", tenantId, entityId);// 初始化node节点tbNode = initComponent(ruleNode);if (tbNode != null) {state = ComponentLifecycleState.ACTIVE;}}
}
private TbNode initComponent(RuleNode ruleNode) throws Exception {TbNode tbNode = null;if (ruleNode != null) {Class<?> componentClazz = Class.forName(ruleNode.getType());tbNode = (TbNode) (componentClazz.getDeclaredConstructor().newInstance());// 具体每个node的init方法tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));}return tbNode;
}

三、DeviceActor

deviceActor不是在server启动时创建的,而是当设备上来数据的时候才会去初始化。
具体是在TenantActor.onToDeviceActorMsg中。

    private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {if (!isCore) {log.warn("RECEIVED INVALID MESSAGE: {}", msg);}if (deletedDevices.contains(msg.getDeviceId())) {log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);return;}// 创建deviceActorTbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());if (priority) {deviceActor.tellWithHighPriority(msg);} else {deviceActor.tell(msg);}}

四、代码调试思路

因为整个server中Actor是核心,所以就从actor入手。

public interface TbActor {boolean process(TbActorMsg msg);TbActorRef getActorRef();default void init(TbActorCtx ctx) throws TbActorException {}default void destroy(TbActorStopReason stopReason, Throwable cause) throws TbActorException {}default InitFailureStrategy onInitFailure(int attempt, Throwable t) {return InitFailureStrategy.retryWithDelay(5000L * attempt);}default ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {if (t instanceof Error) {return ProcessFailureStrategy.stop();} else {return ProcessFailureStrategy.resume();}}
}

在TbActor中,很明显init是初始化的方法,肯定会在初始化的时候调用,找到其中的一个实现如appActor.init(),find useages找到调用appActor.init的地方,发现只有一个地方在调用:TbActorMailbox.tryInit,在沿着往上找,就可以找到DefaultActorService.initActorSystem()方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/858739.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

这个安装文件包,运行SETUP,报错,怎么解决?

大家好,我是Python进阶者。 一、前言 前几天在Python最强王者交流群【小歌】问了一个Python代码调试的问题。问题如下:SETUP.Py在notebook条件下运行,报错信息,谁能指导下不?:) 二、实现过程 这里【瑜亮老师】给了一个指导,如下所示:@小歌 报错说没找到那个txt文件,…

jExcel-类似Excel的jquery电子表格插件

jexcel.js是一款轻量级的类似Excel的jquery电子表格插件。你可以同js数组、json数据或CSV文件来为jexcel表格提供数据,你甚至可以直接从一个Excel表格中直接复制粘贴数据到jexcel表格中。在线预览 下载使用方法 在页面中引入jquery、jquery.jexcel.js和jquery.jexcel.css文件…

优化大宽表查询性能,揭秘GaussDB(DWS) 谓词列analyze

谓词列通指于 WHERE 条件,join条件,group by中涉及到的列,更广义的是指所有需要用于计划生成需要统计信息列的列。本文分享自华为云社区《GaussDB(DWS) 谓词列analyze揭秘》,作者:SmithCoder。 1. 前言 适用版本:【9.1.0.100(及以上)】 ​当前GaussDB(DWS)中存在手动an…

指标管理+AI大模型深度融合,开启智能数据分析管理新时代

随着企业数字化转型的加速,数据管理和分析变得越来越重要。传统的指标管理平台虽然已经能够帮助企业有效地收集、计算、管理和展示关键指标,但在业务分析层面,面对日益复杂的数据环境和业务需求,单纯依靠人工分析已经难以满足高效、精准的管理要求。为此,将指标管理平台与…

nmon监控在linux环境下的安装

nmon下载官网: https://nmon.sourceforge.io/pmwiki.php?n=Site.Download一 、前言Nmon (Nigel’s Monitor)是由IBM 提供、免费监控 AIX 系统与 Linux 系统资源的工具。该工具可将服务器系统资源耗用情况收集起来并输出一个特定的文件,并可利用 excel 分析工具(nmon analyse…

项目管理系统 - 项目管理软件 | 禅道项目管理工具

引言在当今数字化时代,项目管理对于企业的成功至关重要。项目管理系统和软件层出不穷,其中禅道项目管理工具以其独特的优势脱颖而出。禅道作为一款开源的项目管理软件,涵盖了项目管理的各个方面,为企业提供了全面、高效的管理解决方案。无论是软件开发项目、工程建设项目还…

18款顶级在线项目管理网站分享,助你高效管理项目

在当今数字化时代,项目管理的效率和效果对于企业的成功至关重要。在线项目管理工具为企业提供了便捷、高效的解决方案,帮助团队更好地规划、执行和监控项目。本文将介绍18款顶级在线项目管理网站,涵盖不同类型的项目管理工具,希望能为读者带来启发,助力他们在项目管理中取…

大模型提示工程

大模型提示工程转:9 大模型提示词工程应用_哔哩哔哩_bilibili 1. 原则2. 清晰的指令2.1. 分隔符大模型基于概率生成,每次生成的话不一样 2.2. 结构化输出使用网页,直接复制文本 使用接口,就用代码 2.3. 参考示例2.4. 角色扮演3. 让模型思考3.1. 指定步骤大模型只是提供的便…

配置manage路由,实现嵌套路由

1、npm install vue-router 引入vue-router main.ts增加配置 import router from ./routes createApp(App).use(router) 2、src下新建目录routes,新建index.ts // index.ts import { createRouter, createWebHistory } from vue-router; // 引入Vue组件 import Home from ../p…

leetcode 1045

leetcode 1045select customer_id from (select customer_id,count(*) mfrom (select distinct * from Customer) a group by customer_id having count(*) in (select count(*) from Product))p ;日记 23号是周一,到今天圣诞节都没有去上班,请假了,主要是软工的课设要忙,事…