1、Spark Streaming清洗服务,接收kafka中Topic为“task_ATC”中的数据,保存在MySQL中。
- 打开SpringBoot项目BigData-Etl-KongGuan
请认真阅读:在前面的“使用Spark清洗统计业务数据并保存到数据库中”任务阶段中应该已经完成了所有Topic的数据的清洗,如果已经完成了扇区数据的清洗工作,则此步骤可以跳过,如果前面没有完成扇区数据的清洗工作,可参照“使用Spark清洗统计业务数据并保存到数据库中”任务阶段的其他Topic数据的清洗过程,完成扇区数据的清洗。
- 代码路径:BigData-Etl-KongGuan/src/main/java/com/qrsoft/etl/spark/SparkUtil.java,扇区数据清洗的核心代码如下:
/*** 业务处理* @param jsonObject 扇区数据*/public void TaskAtc(JSONObject jsonObject) throws Exception {ATCDao atcD = new ATCDao();ATCEntity atc = new ATCEntity();//设置清洗方式String sectorName = null;try {sectorName = jsonObject.getString("PLAN_SECTOR_NAME");} catch (Exception e) {//logger.info(" ATC无扇区数据: [{}]");System.out.println("ATC无扇区数据");}try {//根据扇区,查询是否已经开始对该扇区进行统计String ACID = jsonObject.getString("ACID");if(sectorName!=null&&!sectorName.equals("")){boolean bool = atcD.isExistThisAtc(ACID);atc.setAcId(jsonObject.getString("ACID"));atc.setAtcTime(jsonObject.getString("ATC_TIME"));atc.setExecuteDate(jsonObject.getString("EXECUTE_DATE"));atc.setPlanSectorName(jsonObject.getString("PLAN_SECTOR_NAME"));if (bool) {//存在,在原来基础上+1,修改数据库中该航迹数量atcD.updateAnAtcMsg(atc);} else {//尚未进行统计 创建一个统计信息atcD.createAnAtcMsg(atc);}}} catch (Exception e) {e.printStackTrace();}}
- 上面的代码中会使用到 BigData-Etl-KongGuan/src/main/java/com/qrsoft/etl/dao/entity/ATCEntity.java 类和BigData-Etl-KongGuan/src/main/java/com/qrsoft/etl/dao/ATCDao.java类。
其中ATCEntity类是用于保存扇区数据的实体类:
package com.qrsoft.etl.dao.entity;import lombok.Data;
import java.io.Serializable;public class ATCEntity implements Serializable {private Integer id;private String acId;private String atcTime;private String executeDate;private String planSectorName;public Integer getId() { return id; }public void setId(Integer id) { this.id = id; }public String getAcId() { return acId;}public void setAcId(String acId) { this.acId = acId; }public String getAtcTime() { return atcTime; }public void setAtcTime(String atcTime) { this.atcTime = atcTime; }public String getExecuteDate() { return executeDate; }public void setExecuteDate(String executeDate) { this.executeDate = executeDate; }public String getPlanSectorName() { return planSectorName; }public void setPlanSectorName(String planSectorName) { this.planSectorName = planSectorName; }
}
ATCDao类是扇区数据的数据访问类,包括扇区统计等方法:
package com.qrsoft.etl.dao;import com.qrsoft.etl.dao.entity.ATCEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;public class ATCDao extends IBaseDao {private final static Logger logger = LoggerFactory.getLogger(ATCDao.class);/*** 根据航班号查询是否该扇区有该航班的统计数据存在* @param acid 航班号*/public boolean isExistThisAtc(String acid){String sql = " SELECT COUNT(*) from atc_number where ACID='"+acid+"';";Object[] params = {};ResultSet comRs = this.execute(sql, params);return getBool(comRs);}//修改指定航班的扇区统计信息public void updateAnAtcMsg(ATCEntity atc) {String sql = "update atc_number set PLAN_SECTOR_NAME='"+atc.getPlanSectorName()+"',ATC_TIME='"+atc.getAtcTime()+"',EXECUTE_DATE='"+atc.getExecuteDate()+"' where ACID='"+atc.getAcId()+"'; ";Object[] params = {};try {this.update(sql, params);} catch (SQLException e) {logger.info("修改指定航班的扇区统计信息失败! "+atc.getAcId());e.printStackTrace();}}//创建新航班扇区的统计信息public void createAnAtcMsg(ATCEntity atc) {String sql = "insert into atc_number (ACID,ATC_TIME,EXECUTE_DATE,PLAN_SECTOR_NAME) values('"+atc.getAcId()+"','"+atc.getAtcTime()+"','"+atc.getExecuteDate()+"','"+atc.getPlanSectorName()+"');";Object[] params = {};try {this.update(sql, params);} catch (SQLException e) {logger.info("创建航班的扇区的统计信息失败!"+atc.getAcId());e.printStackTrace();}}
}
2、前端Vue页面设计,并完成数据绑定,展示扇区数据
- 在Vue项目kongguan_web中src/components文件夹下面新建Section.vue,添加页面div设计代码,并对数据进行循环绑定操作:
<template><div><el-row><el-col :xs="12" :sm="12" :lg="{span: '4'}" v-for="item in resultVal"><div class="grid-content bg-purple"><div class="top_div_css"></div><div class="centen"><div class="to_titls">{{item.planSectorName}}扇区</div><div class="to_titls_two">{{item.count}}架</div></div></div></el-col></el-row></div>
</template>
... 接下页 ...
- 在Vue项目中kongguan_web/src/components/Section.vue页面中添加css布局设计代码:
解释
... 接上页 ...
<style >/*.el-col-lg-4-8 {*//* width: 20%;*//*}*/.centen{width: 100%;}.to_titls{margin-top: 15px;text-align: center;font-size: 20px;color: #676767;}.to_titls_two{margin-top: 15px;text-align: center;font-size: 25px;color: #307be3;}.top_div_css{width: 100%;height: 10px;background-color: #4eb9f8;border-top-left-radius: 2em;border-top-right-radius: 2em;}
.el-row {margin-bottom: 20px;&:last-child {margin-bottom: 0;}}.el-col {border-radius: 2em;}.bg-purple-dark {background: #99a9bf;}.bg-purple {background: #ffffff;}.bg-purple-light {background: #e5e9f2;}.grid-content {margin-top: 13px;border-radius: 1em;min-height: 100px;margin-left: 15px;margin-right: 15px;/*width: 200px;*/background-color: #ffffff;border: 1px solid #ebedf2;box-shadow: 3px 3px 3px 3px #ebedf2;}.row-bg {padding: 10px 0;background-color: #f9fafc;}
</style>
... 接下页 ...
- 在kongguan_web/src/components/Section.vue页面中引入 api,从后台获取数据:
... 接上页 ...
<script>import echarts from "echarts";import {getSectionVal } from "../api/user/api.js";export default {name: "Home",data() {return {chart: null,OneSection:"G",TwoSection:"K",TwoSection:"E",TwoSection:"P",resultVal:{}};},mounted() {this.loadData();},methods: {initChart() {},loadData(){getSectionVal().then(data => {if (data.isSuccess) {var res = data.result;this.resultVal =res;} else {this.$message.error("数据获取失败");}});}}};
</script>
在上面的页面代码中,首先引入了api/user/api.js组件,在api/user/api.js中添加如下代码,设置请求方式,请求的服务端地址和请求的参数等:
import request from '../../utils/request'
const baseUrl="/api"//…
//… 其他 function,略。 …
//…//获取各扇区航班数
export function getSectionVal(data){return request({url:baseUrl+"/atc/findSectorSortie",method:"get",data:data})
}
3、后端项目 BigData-KongGuan 的处理过程
- 首先,创建一个数据实体类com.qrsoft.entity.Atc,用于操作和保存扇区数据,代码所在位置BigData-KongGuan/src/main/java/com/qrsoft/entity/Atc.java,内容如下:
package com.qrsoft.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("atc_number")
public class Atc implements Serializable {@TableId(value = "id",type = IdType.AUTO)private Integer id;@TableField(value = "ACID")private String acId;@TableField(value = "ATC_TIME")private String atcTime;@TableField(value = "EXECUTE_DATE")private String executeDate;@TableField(value = "PLAN_SECTOR_NAME")private String planSectorName;@TableField(exist = false)private String count;
}
- 然后,编写扇区数据的数据访问接口BigData-KongGuan/src/main/java/com/qrsoft/mapper/AtcMapper.java,继承BaseMapper类,代码如下:
package com.qrsoft.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qrsoft.entity.Atc;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;import java.util.List;@Mapper
public interface AtcMapper extends BaseMapper<Atc> {@Select("select PLAN_SECTOR_NAME,COUNT(*) as count from atc_number GROUP BY PLAN_SECTOR_NAME;")List<Atc> findSectorSortie();@Select("select EXECUTE_DATE from atc_number group by EXECUTE_DATE order by EXECUTE_DATE desc limit 19;")List<String> findATCTime();@Select("select PLAN_SECTOR_NAME,count(*) as count from atc_number where EXECUTE_DATE = #{executeTime} and PLAN_SECTOR_NAME = #{sectorName}")Atc findATCTime2(String executeTime,String sectorName);
}
- 然后,建立BigData-KongGuan/src/main/java/com/qrsoft/service/AtcService.java类,在类中使用baseMapper调用findSectorSortie方法,查询所有扇区航班架次:
package com.qrsoft.service;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.qrsoft.common.Result;
import com.qrsoft.common.ResultConstants;
import com.qrsoft.entity.Atc;
import com.qrsoft.entity.MultiRadar;
import com.qrsoft.mapper.AtcMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;@Service
public class AtcService extends ServiceImpl<AtcMapper, Atc> {@Autowiredprivate MultiRadarService multiRadarService;/*** 查询所有扇区航班架次*/public Result findSectorSortie() {List<Atc> sectorSortie = baseMapper.findSectorSortie();return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS, sectorSortie);}/*** 根据扇区号查询架次* @param planSectorName*/public Result findLocusCount(String planSectorName) {QueryWrapper<MultiRadar> queryWrapper = new QueryWrapper<>();queryWrapper.eq("section",planSectorName);int count = multiRadarService.count(queryWrapper);return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS, count);}/*** 扇区架次数动态统计(饼状图)*/public Result findATCTime() {List<String> sectorName = new ArrayList<>();sectorName.add("K");sectorName.add("S");sectorName.add("E");sectorName.add("P");sectorName.add("G");List<String> executeTime = baseMapper.findATCTime();List list = new ArrayList();for (int i = 0; executeTime.size() > i; i++) {ArrayList<Object> objects = new ArrayList<>();for (int j = 0; sectorName.size() > j; j++) {Atc atcTime2 = baseMapper.findATCTime2(executeTime.get(i), sectorName.get(j));HashMap<String, Object> map = new HashMap<>();if (atcTime2.getPlanSectorName() != null) {map.put(atcTime2.getPlanSectorName(), atcTime2.getCount());}else {map.put(sectorName.get(j),0);}objects.add(map);}list.add(objects);}return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS, list);}
}
- 上面代码中,会依赖com.qrsoft.service.MultiRadarService类,并使用其继承的ServiceImpl<T,U>中的count()方法,该方法用于根据扇区号查询架次:
package com.qrsoft.service;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.qrsoft.common.Result;
import com.qrsoft.common.ResultConstants;
import com.qrsoft.entity.MultiRadar;
import com.qrsoft.mapper.MultiRadarMapper;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class MultiRadarService extends ServiceImpl<MultiRadarMapper, MultiRadar> {/*** 查询综合航迹数据*/public Result findMultRadar(){List<MultiRadar> multiRadars = baseMapper.selectList(null);return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS,multiRadars);}}
- 建立扇区数据访问的控制器AtcController.java,代码路径:BigData-KongGuan/src/main/java/com/qrsoft/controller/AtcController.java,内容如下:
package com.qrsoft.controller;import com.qrsoft.common.Result;
import com.qrsoft.service.AtcService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@Api(tags = "扇区操作类")
@RestController
@RequestMapping("/api/atc")
public class AtcController {@Autowiredprivate AtcService service;/*** 获取各扇区航班数*/@ApiOperation(value = "获取各扇区航班数")@GetMapping("/findSectorSortie")public Result findSectorSortie(){return service.findSectorSortie();}/*** 根据扇区名称获取该扇区航班数* @param planSectorName*/@ApiOperation(value = "根据扇区名称获取该扇区航班数")@GetMapping("/findLocusCount")public Result findLocusCount(@RequestParam String planSectorName){return service.findLocusCount(planSectorName);}/*** 扇区架次数动态统计(饼状图)*/@ApiOperation(value = "扇区架次数动态统计(饼状图)")@GetMapping("/findATCTime")public Result findATCTime(){return service.findATCTime();}
}
4、再回到前端src/components/Section.vue页面,解释一下代码
- 首先在src/components/Section.vue页面中设置resultVal:{}接收后台数据,代码如下:
data() {return {chart: null,OneSection:"G",TwoSection:"K",TwoSection:"E",TwoSection:"P",resultVal:{}};
},
- 请求后台数据并赋值,代码如下:
loadData(){getSectionVal().then(data => {if (data.isSuccess) {var res = data.result;this.resultVal =res;} else {this.$message.error("数据获取失败");}});
}
- 循环绑定数据代码如下:
<el-col :span="4" v-for="item in resultVal"><div class="grid-content bg-purple"><div class="top_div_css"></div><div class="centen"><div class="to_titls">{{item.planSectorName}}扇区</div><div class="to_titls_two">{{item.count}}架</div></div></div>
</el-col>
5、在Index.vue页面进行展示
- 在kongguan_web/src/views/Home/Index.vue页面中引入Section组件
import Section from "../../components/Section";
- 声明组件:
components: {AirLine, Section},
- 页面展示:
<el-row :gutter="30" v-show="isShow('/flight/section')"><el-col :span="24" align="center"><Section/></el-col>
</el-row>
注意:在上面代码中【 v-show="isShow('/flight/section')" 】属性的作用是判断当前登录的用户是否有权限显示当前内容,如果当前登录的用户没有权限,则不会显示当前内容,新用户的权限需要到MySQL数据库中进行设置。
这里有两种方式,可以显示当前内容:
1)去掉【 v-show="isShow('/flight/section')" 】属性,即不判断是否有权限显示。
2)需要使用有权限的用户登录才能显示,或到数据库中分配权限。
参照任务“动态航线图”进行设置。
例如我们前面使用的用户admin,该用户没有权限显示,所以使用admin用户登录系统时是不会显示当前内容的,如果要进行权限设置,可以进入MySQL安装节点(node3节点),然后进入数据库,为admin用户授权。
[root@node3 ~]# mysql -uroot -p123456
mysql> use kongguan;
先查看角色表中,“管理员”的ID:
修改sys_auth表,添加一个【/flight/section】权限:
mysql> insert into sys_auth(auth_name,auth_code,menu_url) values('show section','/flight/section','/flight/section');
修改role_auth表,将权限授权给“管理员”角色:
mysql>insert into role_auth(role_id,auth_id) values(3,195);
- Index.vue页面的完整代码如下:
<template><div class="index"><el-row :gutter="30" v-show="isShow('/flight/section')"><el-col :span="24" align="center"><Section/></el-col></el-row><el-row :gutter="30" v-show="isShow('/flight/airline')"><el-col :span=24 align="center"><AirLine/></el-col></el-row></div>
</template><script>import AirLine from "../../components/AirLine";import Section from "../../components/Section";import {hasPermission} from "../../utils/permission";export default {data() {return {};},mounted() {},components: {AirLine, Section},methods: {isShow(permission){return hasPermission(permission);}}};
</script><style scoped>.index {height: 100%;overflow: auto;padding-left: 44px;padding-right: 44px}.index::-webkit-scrollbar {display: none;}.caseClass {background: url('../../assets/images/index-bg.png') no-repeat;background-size: cover;margin-top: 20px;height: 284px;}.el-button {background: transparent;}
</style>
- 确保Hadoop、Spark、Kafka、Redis、MySQL等服务均已经正常启动,如果没有正常启动,请参照前面的安装部署任务,完成这些服务的启动。
例如:在node3节点上启动Redis。
例如:查看MySQL是否正常启动。
- 启动后端项目 BigData-KongGuan
- 启动前端项目 kongguan_web
- 最终页面展示效果如下: