一、基本关系
1、单体服务
2、微服务
3、创建流程
Reference
二、DefaultActorService
DefaultActorService
是整个规则引擎的初始化入口。
DefaultActorService
会初始化一个使用actor模型的规则引擎,共分为2步:
- ①创建actorSystem;
- ②处理应用初始化完成事件
1、创建actorSystem
1、创建appActor
@PostConstruct
public void initActorSystem() {log.info("Initializing actor system.");actorContext.setActorService(this);TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);system = new DefaultTbActorSystem(settings);system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));actorContext.setActorSystem(system);// 创建appActor,全局唯一appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));actorContext.setAppActor(appActor);TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));actorContext.setStatsActor(statsActor);log.info("Actor system initialized.");
}
DefaultTbActorSystem.createActor
private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {Dispatcher dispatcher = dispatchers.get(dispatcherId);if (dispatcher == null) {log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");}TbActorId actorId = creator.createActorId();TbActorMailbox actorMailbox = actors.get(actorId);if (actorMailbox != null) {log.debug("Actor with id [{}] is already registered!", actorId);} else {Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());actorCreationLock.lock();try {actorMailbox = actors.get(actorId);if (actorMailbox == null) {log.debug("Creating actor with id [{}]!", actorId);// appActor类型TbActor actor = creator.createActor();TbActorRef parentRef = null;if (parent != null) {parentRef = getActor(parent);if (parentRef == null) {throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");}}TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);actors.put(actorId, mailbox);// 最后会调用appActor.init方法mailbox.initActor();actorMailbox = mailbox;if (parent != null) {parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);}} else {log.debug("Actor with id [{}] is already registered!", actorId);}} finally {actorCreationLock.unlock();actorCreationLocks.remove(actorId);}}return actorMailbox;
}
appActor.init()
在初始化appActor的时候会启动一个定时任务,去定时清除掉过期的sessionInfo。后面在做扩展的时候需要注意,sessionInfo长时间不用可能会被清除。
public void init(TbActorCtx ctx) throws TbActorException {super.init(ctx);if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE)) {systemContext.schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(),systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());}
}
2、创建stasActor
用于统计状态,创建过程与appActor相同。
== 注:appActor和stasActor全局唯一 ==
2、处理应用初始化完成事件
在应用初始化结束之后会接收到spring发送的ApplicationReadyEvent
事件,会向appActor中发送一个AppInitMsg消息。然后appActor会为每一个租户初始化一个tanentActor和ruleChain、ruleNode。
@AfterStartUp(order = AfterStartUp.ACTOR_SYSTEM)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {log.info("Received application ready event. Sending application init message to actor system");appActor.tellWithHighPriority(new AppInitMsg());
}
appActor.doProcess()
@Override
protected boolean doProcess(TbActorMsg msg) {log.info("app actor s msg s msg type {}", msg.getMsgType());if (!ruleChainsInitialized) {if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {// 初始化租户的actorinitTenantActors();ruleChainsInitialized = true;} else {if (!msg.getMsgType().isIgnoreOnStart()) {log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);}return true;}}// ...
}
appActor.initTenantActors()
private void initTenantActors() {log.info("Starting main system actor.");try {if (systemContext.isTenantComponentsInitEnabled()) {// 查询所有租户PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);for (Tenant tenant : tenantIterator) {log.debug("[{}] Creating tenant actor", tenant.getId());// 为每一个租户初始化一个actorgetOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> {log.debug("[{}] Tenant actor created.", tenant.getId());}, () -> {log.debug("[{}] Skipped actor creation", tenant.getId());});}}log.info("Main system actor started.");} catch (Exception e) {log.warn("Unknown failure", e);}
}
appActor.getOrCreateTenantActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。
传入的actorId是tanentId,ActorCreator是TanentActor.ActorCreator(会传入tenantId,后面初始化ruleChain时会用到)。
在DefaultTbActorSystem.createActor()方法中TbActorMailbox中的actor就是TenantActor.class类型,所以在mailbox.initActor();方法中会调用tenantActor.init()方法进行初始化。
tenantActor.init()
public void init(TbActorCtx ctx) throws TbActorException {super.init(ctx);log.debug("[{}] Starting tenant actor.", tenantId);try {Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);if (tenant == null) {cantFindTenant = true;log.info("[{}] Started tenant actor for missing tenant.", tenantId);} else {isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);if (isRuleEngine) {if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {try {if (getApiUsageState().isReExecEnabled()) {log.debug("[{}] Going to init rule chains", tenantId);// 初始化ruleChaininitRuleChains();} else {log.info("[{}] Skip init of the rule chains due to API limits", tenantId);}} catch (Exception e) {log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);cantFindTenant = true;}} else {log.info("Tenant {} is not managed by current service, skipping rule chains init", tenantId);}}log.debug("[{}] Tenant actor started.", tenantId);}} catch (Exception e) {log.warn("[{}] Unknown failure", tenantId, e);}
}
在tenantActor初始化时会去初始化该租户下的ruleChain。
RuleChainManagerActor.initRuleChains()
protected void initRuleChains() {log.debug("[{}] Initializing rule chains", tenantId);// 查询该租户下所有的ruleChainfor (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {RuleChainId ruleChainId = ruleChain.getId();log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());// 初始化每一个ruleChainTbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);visit(ruleChain, actorRef);log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());}ruleChainsInitialized = true;
}
RuleChainManagerActor.getOrCreateActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。
DefaultTbActorSystem.createActor()
private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {Dispatcher dispatcher = dispatchers.get(dispatcherId);if (dispatcher == null) {log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");}TbActorId actorId = creator.createActorId();TbActorMailbox actorMailbox = actors.get(actorId);if (actorMailbox != null) {log.debug("Actor with id [{}] is already registered!", actorId);} else {Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());actorCreationLock.lock();try {actorMailbox = actors.get(actorId);if (actorMailbox == null) {log.debug("Creating actor with id [{}]!", actorId);// RuleChainActor.class类型TbActor actor = creator.createActor();TbActorRef parentRef = null;if (parent != null) {parentRef = getActor(parent);if (parentRef == null) {throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");}}TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);actors.put(actorId, mailbox);// 会调用RuleChainActor.init()方法,但是RuleChainActor没有重写init方法,而RuleChainActor继承ComponentActor,所以会调用ComponentActor.init()方法mailbox.initActor();actorMailbox = mailbox;if (parent != null) {parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);}} else {log.debug("Actor with id [{}] is already registered!", actorId);}} finally {actorCreationLock.unlock();actorCreationLocks.remove(actorId);}}return actorMailbox;
}
ComponentActor.init()
@Override
public void init(TbActorCtx ctx) throws TbActorException {super.init(ctx);// createProcessor会返回一个RuleChainActorMessageProcessor类型this.processor = createProcessor(ctx);// 这个方法里面会去初始化ruleNodeinitProcessor(ctx);
}
RuleChainActor.createProcessor()
@Override
protected RuleChainActorMessageProcessor createProcessor(TbActorCtx ctx) {return new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext,ctx.getParentRef(), ctx);
}
ComponentActor.initProcessor()
protected void initProcessor(TbActorCtx ctx) throws TbActorException {try {log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());// 这个方法里面回去初始化ruleNodeprocessor.start(ctx);logLifecycleEvent(ComponentLifecycleEvent.STARTED);if (systemContext.isStatisticsEnabled()) {scheduleStatsPersistTick();}} catch (Exception e) {log.debug("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType(), e);logAndPersist("OnStart", e, true);logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);throw new TbActorException("Failed to init actor", e);}
}
RuleChainActorMessageProcessor.start()
@Override
public void start(TbActorCtx context) {if (!started) {RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {// 找到租户下的ruleChainList<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);log.debug("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());// Creating and starting the actors;for (RuleNode ruleNode : ruleNodeList) {log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);// 创建ruleNodeTbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);// 缓存nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));}initRoutes(ruleChain, ruleNodeList);started = true;}} else {onUpdate(context);}
}
- createRuleNodeActor方法最终也会调用到
DefaultTbActorSystem.createActor
,而actor的类型是RuleNodeActor.class
。 - 所以会调用ruleNodeActor.init方法,而ruleNodeActor继承自ComponentActor,所以也会调用到ComponentActor中的init方法,
- 跟ruleChainActor一样,最后会调用到ruleNodeActor中的createProcessor方法,返回值是一个RuleNodeActorMessageProcessor类型,
ComponentActor.initProcessor
中的start方法就会调用到RuleNodeActorMessageProcessor.start
方法
RuleNodeActorMessageProcessor.start
@Override
public void start(TbActorCtx context) throws Exception {if (isMyNodePartition()) {log.debug("[{}][{}] Starting", tenantId, entityId);// 初始化node节点tbNode = initComponent(ruleNode);if (tbNode != null) {state = ComponentLifecycleState.ACTIVE;}}
}
private TbNode initComponent(RuleNode ruleNode) throws Exception {TbNode tbNode = null;if (ruleNode != null) {Class<?> componentClazz = Class.forName(ruleNode.getType());tbNode = (TbNode) (componentClazz.getDeclaredConstructor().newInstance());// 具体每个node的init方法tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));}return tbNode;
}
三、DeviceActor
deviceActor不是在server启动时创建的,而是当设备上来数据的时候才会去初始化。
具体是在TenantActor.onToDeviceActorMsg
中。
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {if (!isCore) {log.warn("RECEIVED INVALID MESSAGE: {}", msg);}if (deletedDevices.contains(msg.getDeviceId())) {log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);return;}// 创建deviceActorTbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());if (priority) {deviceActor.tellWithHighPriority(msg);} else {deviceActor.tell(msg);}}
四、代码调试思路
因为整个server中Actor是核心,所以就从actor入手。
public interface TbActor {boolean process(TbActorMsg msg);TbActorRef getActorRef();default void init(TbActorCtx ctx) throws TbActorException {}default void destroy(TbActorStopReason stopReason, Throwable cause) throws TbActorException {}default InitFailureStrategy onInitFailure(int attempt, Throwable t) {return InitFailureStrategy.retryWithDelay(5000L * attempt);}default ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {if (t instanceof Error) {return ProcessFailureStrategy.stop();} else {return ProcessFailureStrategy.resume();}}
}
在TbActor中,很明显init是初始化的方法,肯定会在初始化的时候调用,找到其中的一个实现如appActor.init(),find useages找到调用appActor.init的地方,发现只有一个地方在调用:TbActorMailbox.tryInit,在沿着往上找,就可以找到DefaultActorService.initActorSystem()
方法。