健康检查案例实现
案例要求:能够动态的实现,健康检查的间隔时间,重试次数,动态拉去配置信息,校验发现配置变更,进行健康检查任务的动态变更
1)配置远程调用
Spring 允许我们通过定义接口的方式,给任意位置发送 http 请求,实现远程调用,可以用来简化 HTTP 远程访问。需要webflux场景才可
1.1)导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
1.2)编写配置属性
//算法客户端主要用到 private String tankOcrUrl = "http://127.0.0.1:6013";的属性
@ConfigurationProperties(prefix = "edge.client")
@Data
@RefreshScope
public class EdgeClientProperties {/*** Configuration properties imagePath*/private String imagePath = "/srv/www/upload";/*** Configuration properties imageServerPrefix*/private String imageServerPrefix = "";/*** Configuration properties*//*** 危化车OCR识别地址*/private String tankOcrUrl = "http://127.0.0.1:6013";}//****************************************平台客户端,配置属性, private String endpoint;*******************************************
@ConfigurationProperties(prefix = "app.edge")
@Data
@RefreshScope
public class EdgeProperties {/*** 络端编码*/private String code;/*** 平台信息*/@NestedConfigurationPropertyprivate Platform platform = new Platform();/*** 配置信息*/@NestedConfigurationPropertyprivate EdgeConfig edgeConfig = new EdgeConfig();/*** 平台信息*/@Datapublic static class Platform {/*** 平台地址*/private String endpoint;}/*** 边缘端配置信息*/@Datapublic static class EdgeConfig {/*** 注册地址*/private String registerUrl;/*** 心跳地址*/private String heartbeatUrl;/*** 账号*/private String username;/*** 密码*/private String password;/*** 心跳时间*/private Integer heartTime;/*** 心跳最大等待时间*/private Integer maxHeartTime;}
}
/******************************************** Yaml 配置文件********************************************************************/
--- #配置边缘端信息
app:edge:code: ${APP_EDGE_CODE:BD123456789}platform:endpoint: ${APP_PLATFORM_URL:http://192.168.3.11:32240}edge-config:register-url: ${APP_REGISTER_URL:/api/v1/itd/basic/edge/device/register}heartbeat-url: ${APP_HEART_URL:/api/v1/itd/basic/edge/device/heartbeat}heart-time: ${APP_HEART_TIME:10}max-heart-time: ${APP_HEART_TIME:100}username: ${APP_USERNAME:admin}password: ${APP_PASSWORD:123456}
--- #算法服务器基础信息
edge:client:tank-ocr-url: http://192.168.3.68:31869
1.3)编写远程调用接口
/*************************************************平台端远程调用接口***********************************************/
@HttpExchange
public interface PlatformClient {/*** @param registerReq* @return register* @description 平台注册*/@PostExchange(url = "/api/v1/itd/basic/edge/device/register")JsonResult<EdgeRegisterHeartbeatDTO> register(@RequestBody RegisterReq registerReq);/*** @param heartbeatReq* @return register* @description 健康检查*/@PostExchange(url = "/api/v1/itd/basic/edge/device/heartbeat")JsonResult<EdgeRegisterHeartbeatDTO> heartbeatCheck(@RequestBody HeartbeatReq heartbeatReq);
}
/*************************************************算法识别远程调用接口***********************************************/
@HttpExchange
public interface RecognizeClient {/*** @param request* @return RecognizeResponse*/@PostExchangeRecognizeResponse recognize(@RequestBody RecognizeRequest request);/*** @param request* @return OcrResponse*/@PostExchangeDgCarResponse recognizeDgCar(@RequestBody RecognizeRequest request);/*** @param request* @return CarTypeResponse*/@PostExchangeCarTypeResponse recognizeCarType(@RequestBody RecognizeRequest request);
}
1.4)编写配置类
/**
*在启动类我们已经将这两个Properties进行导入 @EnableConfigurationProperties({EdgeClientProperties.class, EdgeProperties.class})
*/
@Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor
public class EdgeConfiguration {/*** @description 边缘客户端属性*/private final EdgeClientProperties properties;/*** @description 边缘属性*/private final EdgeProperties edgeProperties;/*** @description token管理容器*/private final TokenManager tokenManager;/*** 构建HttpServiceProxyFactory* @param baseUrl 基础url* @param objectMapper 对象映射器* @return HttpServiceProxyFactory*/@NonNullprivate HttpServiceProxyFactory buildFactory(String baseUrl, ObjectMapper objectMapper) {AtomicReference<Integer> heartTime = new AtomicReference<>(edgeProperties.getEdgeConfig().getHeartTime());//进行JSON序列化操作,后续会出《内容协商相关博客》ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder().codecs(configurer -> {configurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper));configurer.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper));}).build();/*通过构建WebClient 进行远程调用,*/val webClientBuild = WebClient.builder().baseUrl(baseUrl).exchangeStrategies(exchangeStrategies);if (edgeProperties.getPlatform().getEndpoint().equals(baseUrl)) {// 配置边缘端对平台的请求调用,由于心跳的时候需要设置token校验,每次心跳的时候进行token的设置webClientBuild.filter((request, next) -> next.exchange(ClientRequest.from(request).header("register", tokenManager.getToken()).build()));// 超时等待时间,根据token管理容器动态的设置超时相应时间Optional.ofNullable(tokenManager.getHeartbeat()).ifPresent(heartTime::set);}WebClient webClient = webClientBuild.build();WebClientAdapter exchangeAdapter = WebClientAdapter.create(webClient);exchangeAdapter.setBlockTimeout(Duration.of(heartTime.get(), ChronoUnit.SECONDS));return HttpServiceProxyFactory.builderFor(exchangeAdapter).build();}/*************************************************算法识别远程调用********************************************************/@Beanpublic RecognizeClient recognizeClient(ObjectProvider<ObjectMapper> objectMappers) {val objectMapper = objectMappers.getObject();//调用构建方法,构建算法调用客户端val factory = buildFactory(properties.getTankOcrUrl(), objectMapper);return factory.createClient(RecognizeClient.class);}@Beanpublic PlatformClient platformClient(ObjectProvider<ObjectMapper> objectMappers) {val objectMapper = objectMappers.getObject();//调用构建方法,构建平台调用客户端val factory = buildFactory(edgeProperties.getPlatform().getEndpoint(), objectMapper);return factory.createClient(PlatformClient.class);}}
2)健康检查定时任务
@Service
@Slf4j
@RequiredArgsConstructor
public class HeartbeatTask {/*** @description 边缘端属性*/private final EdgeProperties edgeProperties;/*** @description 边缘服务,进行业务相关的查询,这里就不为其展示业务的调用功能*/private final EdgeService edgeService;/*** @description 推送事件,当发生配置变更等情况进行变更*/private final ApplicationEventPublisher publisher;/*** @description 平台端远程调用接口*/private final PlatformClient platformClient;/*** 开启心跳检测*/public void startHeartbeat() {//进行动态变更的时候,每次需要重新创建一个任务线程池ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();Runnable heartbeatTask = new Runnable() {@Overridepublic void run() {log.info("Heartbeat sent at:{}", LocalDateTime.now());HeartbeatReq heartbeatReq = new HeartbeatReq();//构建请求体BeanUtil.copyProperties(edgeDeviceHeartbeatDTO, heartbeatReq);JsonResult<EdgeRegisterHeartbeatDTO> result = new JsonResult<>();try {//向平台端发送健康检测请求result = platformClient.heartbeatCheck(heartbeatReq);log.info("platform-service with return {}.", result);if (result.getData().getIsSuccess()) {log.info("Heartbeat successfully sent.");//业务代码,校验数据变更,进行动态更新edgeService.verifyHeartbeatDevice(result.getData().getEdgeDeviceListDTO().getBindDeviceList());Boolean verified = edgeService.verifyHeartbeatConfig(result.getData().getEdgeDeviceListDTO(),edgeProperties.getCode());if (verified) {log.error("Configuration changed!!!");//配送发生变更,发送变更事件进行后续的变更处理pushHeartEvent("The configuration has changed. Restart the scheduled task",HeartbeatStatus.CONFIG_CHANGE.status, scheduler);}}} catch (WebClientResponseException e) {log.error("---heartbeat request error !!! while making the HTTP request: ", e);if (e.getStatusCode().value() == 403) {log.error("token expires error, Heartbeat failed: {}", e.getMessage());//token 过期,发送token失效事件tokenExpired(scheduler);}}}};scheduler.scheduleAtFixedRate(heartbeatTask, 5, edgeConfig.getHeartbeatTime(), TimeUnit.SECONDS);}/*** @param message* @param status* @param scheduler*/private void pushHeartEvent(String message, Integer status, ScheduledExecutorService scheduler) {HeartbeatEvent event = new HeartbeatEvent(this, message, status, scheduler);publisher.publishEvent(event);}/*** @param scheduler* @return*/private void tokenExpired(ScheduledExecutorService scheduler) {pushHeartEvent("token过期,重新开启定时任务", HeartbeatStatus.TOKEN_EXPIRATION.status, scheduler);}/*** @param scheduler* @description 停止心跳检测*/public void stopHeartbeat(ScheduledExecutorService scheduler) {// 重新开启任务scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {log.error("Failed to gracefully shutdown scheduler");}}} catch (InterruptedException ex) {scheduler.shutdownNow();Thread.currentThread().interrupt();}}
}
3)事件触发
3.1)心跳事件
/*** ******************************** 一、定义心跳事件,继承ApplicationEvent,自定义事件监听*******************************************/
@Getter
@Setter
public class HeartbeatEvent extends ApplicationEvent {/*** @description Java Attribute variables/private String message;/*** @description Java Attribute variables*/private Integer status;/*** @description Java Attribute variables*/private ScheduledExecutorService scheduler;/*** @param source 需要构造父类的source* @param message* @param status* @param scheduler* @return*/public HeartbeatEvent(Object source, String message, Integer status, ScheduledExecutorService scheduler) {super(source);this.message = message;this.status = status;this.scheduler = scheduler;}}
3.1)心跳监听
/*** *********************************** 二、定义心跳监听器,监听心跳事件*************************************************************/
@Slf4j
@Component
@RequiredArgsConstructor
public class HeartbeatEventListener {/*** 监听心跳返回的信息** @param event*/@EventListenerpublic void onApplicationEvent(HeartbeatEvent event) {log.info("onApplicationEvent received: {}", event.getMessage());// 1.token过期if (event.getStatus() == HeartbeatStatus.TOKEN_EXPIRATION.status) {log.error("Token expired, stopping the heartbeat task and reloading register........");//进行相关业务处理restartHeart();}// 2.配置变更,重新加载if (event.getStatus() == HeartbeatStatus.CONFIG_CHANGE.status) {log.info("The event was accepted successfully, the configuration changed"); //进行相关业务处理restartHeart();}}/***进行业务代码处理,发生变更,重新关闭心跳,开启新的心跳检测*/private void restartHeart(){ heartbeatTask.stopHeartbeat(event.getScheduler());heartbeatTask.startHeartbeat();}
}