需要通过elk日志分析平台接收jumpserver日志,对日志进行过滤和拆分。并通过Grafana进行企业微信告警推送和大屏展示
1.系统介绍
名称 | 软件版本 |
---|---|
jumpserver | jumpserver-3.10.13-tls |
elasticsearch | elasticsearch-8.12.2 |
kibana | kibana-8.12.2 |
logstash | logstash-8.12.2 |
granfa | Grafana v11.0.0 |
2.jumpserver配置syslog
此处参考飞致云syslog配置文档:https://kb.fit2cloud.com/?p=123#heading-11
在/opt/jumpserver/config/config.txt配置文件钟添加syslog配置
# 加入 syslog 相关设置
SYSLOG_ENABLE=True
SYSLOG_ADDR=10.22.3.12:5149
- 重启jumpserver
/opt/jumpserver-offline-release-v3.10.13-amd64/jmsctl.sh restart
3.jumpserver的日志类型
jumpserver日志一共有:登录日志 、上传文件日志、下载文件日志、操作日志、改密日志、会话日志、命令日志这几种类型
4.logstash拆分jumpserver日志
# 定义接收jumpserver syslog的端口
input {udp {port => 5149}
}# 定义拆分jumpserver syslog的规则
filter {
# 判断日志类型是否为session_command_log,特定grok规则来解决linux机器记录w和top等命令日志无法解析情况
# 对于非session_command_log日志,将采用通用grok规则if [message] =~ /session_command_log/ {grok {match => { "message" => "<14>jumpserver: session_command_log - %{GREEDYDATA:reallogs}\u0000" }add_field => { "logtype" => "session_command_log" }}} else {grok {match => { "message" => "<%{NUMBER:priority}>%{GREEDYDATA:logsouce}: %{GREEDYDATA:logtype} - %{GREEDYDATA:reallogs}\u0000" }}}# 利用json拆分实际记录的日志json {source => "reallogs"target => "manage"}# 判断日志类型是否为login_log,用来获取用户账户名和用户账户显示名称if [logtype] != "login_log" {mutate {gsub => ["[manage][user]", "\)$", ""]split => { "[manage][user]" => "(" }}mutate {add_field => {"[manage][user_name]" => "%{[manage][user][0]}""[manage][user_account]" => "%{[manage][user][1]}"}}}# 移除不需要的日志mutate {remove_field => ["reallogs", "@version", "event", "logsouce", "priority", "[manage][user]", "message"]}# 解析login_log日志,用日志内的真实时间来替换@timestamp,保证日志时间的真确性if [logtype] == "login_log" {date {match => ["[manage][datetime]", "yyyy/MM/dd HH:mm:ss Z"]target => "@timestamp"}}# 解析ftp_log,拆分资产名字和资产IP,拆分资产连接账号和资产账户显示名称,用日志内的真实时间来替换@timestamp,保证日志时间的真确性if [logtype] == "ftp_log" {mutate {gsub => ["[manage][asset]", "\)$", "","[manage][account]", "\)$", ""]split => {"[manage][asset]" => "(""[manage][account]" => "("}}mutate {add_field => {"[manage][asset_name]" => "%{[manage][asset][0]}""[manage][asset_ip]" => "%{[manage][asset][1]}""[manage][asset_account_name]" => "%{[manage][account][0]}""[manage][asset_account_user]" => "%{[manage][account][1]}"}remove_field => [ "[manage][asset]","[manage][account]" ]}date {match => ["[manage][date_start]", "yyyy/MM/dd HH:mm:ss Z"]target => "@timestamp"}}# 解析operation_log日志,用日志内的真实时间来替换@timestamp,保证日志时间的真确性if [logtype] == "operation_log" {date {match => ["[manage][datetime]", "yyyy/MM/dd HH:mm:ss Z"]target => "@timestamp"}}# 解析password_change_log,操作人员名称和显示名,用日志内的真实时间来替换@timestamp,保证日志时间的真确性if [logtype] == "password_change_log" {mutate {gsub => [ "[manage][change_by]", "\)$", "" ]split => { "[manage][change_by]" => "(" }}mutate {add_field => {"[manage][changeby_user]" => "%{[manage][change_by][0]}""[manage][changeby_account]" => "%{[manage][change_by][1]}"}remove_field => [ "[manage][change_by]" ]}date {match => ["[manage][datetime]", "yyyy/MM/dd HH:mm:ss Z"]target => "@timestamp"}}# 解析host_session_log,拆分资产名字和资产IP,拆分资产连接账号和资产账户显示名称,用日志内的真实时间来替换@timestamp,保证日志时间的真确性if [logtype] == "host_session_log" {mutate {gsub => ["[manage][asset]", "\)$", "","[manage][account]", "\)$", ""]split => {"[manage][asset]" => "(""[manage][account]" => "("}}mutate {add_field => {"[manage][asset_name]" => "%{[manage][asset][0]}""[manage][asset_ip]" => "%{[manage][asset][1]}""[manage][asset_account_name]" => "%{[manage][account][0]}""[manage][asset_account_user]" => "%{[manage][account][1]}"}remove_field => [ "[manage][asset]","[manage][account]" ]}if [manage][date_end] {mutate {add_field => { "connect-time" => "%{[manage][duration]}" }}date {match => ["[manage][date_end]", "yyyy/MM/dd HH:mm:ss Z"]target => "@timestamp"}} else if [manage][date_start] {date {match => ["[manage][date_start]", "yyyy/MM/dd HH:mm:ss Z"]target => "@timestamp"}}}# 解析session_command_log,拆分资产名字和资产IP,拆分资产连接账号和资产账户显示名称,用日志内的真实时间来替换@timestamp,保证日志时间的真确性if [logtype] == "session_command_log" {mutate {gsub => ["[manage][asset]", "\)$", "","[manage][account]", "\)$", ""]split => {"[manage][asset]" => "(""[manage][account]" => "("}}mutate {add_field => {"[manage][asset_name]" => "%{[manage][asset][0]}""[manage][asset_ip]" => "%{[manage][asset][1]}""[manage][asset_account_name]" => "%{[manage][account][0]}""[manage][asset_account_user]" => "%{[manage][account][1]}"}remove_field => [ "[manage][asset]","[manage][account]" ]}date {match => ["[manage][timestamp_display]", "yyyy/MM/dd HH:mm:ss Z"]target => "@timestamp"}}# 日志拆分完毕后,删除不需要的日志mutate {remove_field => ["[manage][id]","[manage][asset_id]","[manage][account_id]","[manage][org_id]","[manage][terminal_display]","[manage][terminal][id]","[manage][terminal][name]","[manage][user_id]","[manage][timestamp]", "[manage][session]"]}}# 创建索引
output {elasticsearch {hosts => ["http://localhost:9200"]index => "sh-blj-%{+YYYY.MM.dd}"action => "create"user => "elastic"password => "password"}stdout {codec => "rubydebug"}
}