6 ELK 综合实战案例
6.1 Filebeat 收集Nginx日志利用 Redis 缓存发送至 Elasticsearch
图上ip地址仅供参考
6.1.2.2 修改 Filebeat 配置
#安装redis(访问0.0.0.0和密码123456),nginx(访问日志json格式) [root@ubuntu ~]#vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: logenabled: truepaths:- /var/log/nginx/access_json.logjson.keys_under_root: true #默认False会将json数据存储至message,改为true则会独立message外存储json.overwrite_keys: true #设为true,覆盖默认的message字段,使用自定义json格式中的keytags: ["nginx-access"]- type: logenabled: truepaths:- /var/log/nginx/error.logtags: ["nginx-error"]- type: logenabled: truepaths:- /var/log/syslogtags: ["syslog"]output.redis:hosts: ["10.0.0.154:6379"]password: "123456"db: "0"key: "filebeat" #所有日志都存放在key名称为filebeat的列表中,llen filebeat可查看长度,即日志记录数 [root@ubuntu ~]#systemctl restart filebeat.service#查看redis [root@ubuntu ~]#redis-cli -a 123456 127.0.0.1:6379> keys * 1) "filebeat" 127.0.0.1:6379> llen filebeat #多少条数据 (integer) 91
#把之前的配置移走,防干扰 [root@ubuntu conf.d]#mv app_filebeat_filter_es.conf bak#注意:必须删除下面的文件中注释 [root@logstash ~]#vim /etc/logstash/conf.d/redis-to-es.conf input {redis {host => "10.0.0.154"port => "6379"password => "123456"db => "0"key => "filebeat"data_type => "list"} } filter {if "nginx-access" in [tags] {geoip {source => "clientip" #日志必须是json格式,且有一个clientip的keytarget => "geoip"#database => "/etc/logstash/conf.d/GeoLite2-City.mmdb" #指定数据库文件,可选(可下最新数据)add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lon]}"] #8.X添加经纬度字段包括经度,使用时删除注释(配合kibana画地图用,格式要求,不用就不用加)add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lat]}"] #8.X添加经纬度字段包括纬度(配合kibana画地图用,格式要求,不用就不用加)#add_field => ["[geoip][coordinates]", "%{[geoip][longitude]}"] #7,X添加经纬度字段包括经度#add_field => ["[geoip][coordinates]", "%{[geoip][latitude]}"] #7,X添加经纬度字段包括纬度 }#转换经纬度为浮点数,注意:8X必须做,7.X 可不做此步(配合kibana画地图用,不用就不用加) mutate {convert => [ "[geoip][coordinates]", "float"] }}mutate { #不重要,不转不影响convert => ["upstreamtime","float"]} } output {if "syslog" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200","10.0.0.152:9200","10.0.0.153:9200"]index => "syslog-%{+YYYY.MM.dd}"}}if "nginx-access" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200","10.0.0.152:9200","10.0.0.153:9200"]index => "nginxaccess-%{+YYYY.MM.dd}"#注意:7.x之前版本地图功能要求必须使用 logstash 开头的索引名称template_overwrite => true #防止名称相同,覆盖 }stdout { #有没有无所谓codec => "rubydebug"}}if "nginx-error" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200","10.0.0.152:9200","10.0.0.153:9200"]index => "nginxerrorlog-%{+YYYY.MM.dd}"template_overwrite => true}} }#logstash取一条数据,redis会少一条数据 [root@logstash ~]#systemctl restart logstash#redis的数据全部被取走,清空了
6.2 Filebeat 收集Nginx日志并写入 Kafka 缓存发送至 Elasticsearch
6.2.2 Filebeat 收集 Nginx 的访问和错误日志并发送至 kafka
[root@web1 ~]#vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: logenabled: truepaths:- /var/log/nginx/access_json.logjson.keys_under_root: truejson.overwrite_keys: truetags: ["nginx-access"]- type: logenabled: truepaths:- /var/log/nginx/error.log tags: ["nginx-error"]- type: logenabled: truepaths:- /var/log/syslog tags: ["syslog"]output.kafka: #这测试单机kafka能写入topic但没法读,集群没问题hosts: ["10.0.0.156:9092", "10.0.0.157:9092", "10.0.0.158:9092"]topic: filebeat-log #指定kafka的topic partition.round_robin:reachable_only: true #true表示只发布到可用的分区,false时表示所有分区,如果一个节点down会导致blockrequired_acks: 1 #如果为0,表示不确认,错误消息可能会丢失,1等待写入主分区(默认),-1等待写入副本分区 compression: gzip max_message_bytes: 1000000 #每条消息最大长度,以字节为单位,如果超过将丢弃 [root@web1 ~]#systemctl restart filebeat.service#查看kafka是否有topic [root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.0.0.156:9092 filebeat-log #在kafka查看是否有数据 [root@node1 ~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic filebeat-log --bootstrap-server 10.0.0.156:9092 --from-beginning
[root@ubuntu conf.d]#mv redis-to-es.conf bak [root@logstash ~]#vim /etc/logstash/conf.d/kafka-to-es.conf input {kafka {bootstrap_servers => "10.0.0.156:9092,10.0.0.157:9092,10.0.0.158:9092"topics => "filebeat-log"codec => "json"#group_id => "logstash" #消费者组的名称(多个logstash要分组,防止重复消费)#consumer_threads => "3" #建议设置为和kafka的分区相同的值为线程数#topics_pattern => "nginx-.*" #通过正则表达式匹配topic,而非用上面topics=>指定固定值 } } filter {if "nginx-access" in [tags] {geoip {source => "clientip"target => "geoip"add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lon]}"]add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lat]}"]}mutate {convert => [ "[geoip][coordinates]", "float"]}}mutate {convert => ["upstreamtime","float"]} } output {#stdout {} #调试使用if "nginx-access" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}"}} if "nginx-error" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}"}} if "syslog" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-syslog-%{+YYYY.MM.dd}"}} } [root@logstash ~]#systemctl restart logstash.service
6.3.7 Kibana 创建地图数据
6.3.7.1 查看 Kibana 样例地图
kibana有样例数据
kibana首页下使用样例数据,点击其他样例数据集,下sample web logs,点击添加
在左侧分析下发现可以查看添加的样例数据集(参考格式)
左侧分析下点击maps,样例数据已经画好地图,点击即可查看
kibana上画地图
左侧分析下点击maps,点击右侧创建地图,点击添加图层,选择数据(得是地理空间字段) #7版本,地理数据直接可以使用了,但是8版本需要把地理数据转格式#7.x支持 float格式,8.x支持geo_point类型。
8版本需要对地理数据进行转换
点击左侧management下的开发工具 在控制台下输入索引名,去get获取返回 GET /logstash-kafka-nginx-accesslog-2023.02.19 点击三角箭头,把返回的内容复制出来,做下修改 #下方类型应该是point,但是现在是float类型 "geoip": {"properties": {"coordinates": {"type": "float" #只修改此行,原值为float改为geo_point },#必须要先删除旧的索引数据,下面修改才能生效 执行下面操作生成索引模板,将上面的mappings部分内容(一直到"settings"前面)复制到下面,只修改"coordinates": { "type": "geo_point" } 部分 #在mappings上方追加下面的内容 PUT /_template/template_nginx_accesslog #模板名称可以自定义 {"index_patterns" : ["logstash-kafka-nginx-accesslog-*" #8.X数据视图对应的索引模式,匹配索引名称的模式 ],"order" : 0,"aliases" : { }, #在mappings上面追加这些内容"mappings": { #复制mappings开始的行到settings行之前结束,并最后再加一 个 }"properties": {......."geoip": {"properties": {"coordinates": {"type": "geo_point", #只修改此行,原值为float改为geo_point ......."xff": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}}}} }#在重新生成索引(源头日志生成数据),看索引内容后面生成的就是geoip类型#再查一次,看看geoip字段是否改过来了 GET logstash-kafka-nginx-accesslog-2023.02.19
kibana上画地图
左侧分析下点击maps,点击右侧创建地图,点击添加图层
先点文档,选择数据视图,点击地理空间字段,添加,点保存添加到仪表板
分享链接给别人
一般es都会定期删除旧数据,想保留的数据就存到mysql中
[root@es-node1 ~]#apt install mysql-server [root@es-node1 ~]#sed -i '/127.0.0.1/s/^/#/' /etc/mysql/mysql.conf.d/mysqld.cnf #[root@es-node1 ~]#vim /etc/mysql/mysql.conf.d/mysqld.cnf #bind-address = 0.0.0.0 #mysqlx-bind-address = 0.0.0.0 [root@es-node1 ~]#systemctl restart mysql.service
[root@es-node1 ~]#mysql mysql> create database elk ; #MySQL8.0以上默认就支持 #mysql> create database elk character set utf8 collate utf8_bin; #MySQL5.7以下需要显示指定(默认Latin) mysql> create user elk@"10.0.0.%" identified by '123456'; mysql> grant all privileges on elk.* to elk@"10.0.0.%"; mysql> flush privileges;#创建表,字段对应需要保存的数据字段 mysql> use elk mysql> create table elklog (clientip varchar(39),responsetime float(10,3),uri varchar(256),status char(3),time timestamp default current_timestamp ); mysql> desc elklog; +--------------+--------------+------+-----+-------------------+-------+ | Field | Type | Null | Key | Default | Extra | +--------------+--------------+------+-----+-------------------+-------+ | clientip | varchar(39) | YES | | NULL | | | responsetime | float(10,3) | YES | | NULL | | | uri | varchar(256) | YES | | NULL | | | status | char(3) | YES | | NULL | | | time | timestamp | NO | | CURRENT_TIMESTAMP | | +--------------+--------------+------+-----+-------------------+-------+ 5 rows in set (0.00 sec)
[root@logstash1 ~]#apt -y install mysql-client [root@logstash1 ~]#mysql -uelk -p123456 -h10.0.0.155 -e 'show databases' mysql: [Warning] Using a password on the command line interface can be insecure. +--------------------+ | Database | +--------------------+ | information_schema | | elk | +--------------------+
logstash连接mysql需要这个包 官方下载地址:https://dev.mysql.com/downloads/connector/ #点击Connetor/J,选择对应mysql版本,系统版本#下载mysql-connector-j_8.0.33-1ubuntu22.04_all.deb [root@logstash-ubuntu2204 ~]#dpkg -i mysql-connector-j_8.0.33-1ubuntu22.04_all.deb [root@logstash-ubuntu2204 ~]#dpkg -L mysql-connector-j ... /usr/share/java /usr/share/java/mysql-connector-j-8.0.33.jar#2)复制jar文件到logstash指定的目录下 [root@logstash-ubuntu2204 ~]#mkdir -p /usr/share/logstash/vendor/jar/jdbc [root@logstash-ubuntu2204 ~]#cp /usr/share/java/mysql-connector-j-8.0.33.jar /usr/share/logstash/vendor/jar/jdbc/
#查看安装的插件,默认只有input-jdbc插件,需要安装output-jdbc插件 [root@logstash1 ~]#/usr/share/logstash/bin/logstash-plugin list|grep jdbc ...... logstash-integration-jdbc├── logstash-input-jdbc #当前只有input-jdbc├── logstash-filter-jdbc_streaming└── logstash-filter-jdbc_static#在线安装output-jdbc插件,可能会等较长时间 [root@logstash1 ~]#/usr/share/logstash/bin/logstash-plugin install logstash-output-jdbc #离线安装 #如果无法在线安装,可以先从已经安装的主机导出插件,再导入 #导出插件 [root@ubuntu2204 ~]#/usr/share/logstash/bin/logstash-plugin prepare-offline-pack logstash-output-jdbc #离线导入插件 [root@ubuntu2204 ~]#/usr/share/logstash/bin/logstash-plugin install file:///root/logstash-offline-plugins-8.12.2.zip#删除插件 #[root@logstash1 ~]#/usr/share/logstash/bin/logstash-plugin remove logstash-output-jdbc
[root@logstash ~]#vim /etc/logstash/conf.d/kafka-to-es.conf input {kafka {bootstrap_servers => "10.0.0.156:9092,10.0.0.157:9092,10.0.0.158:9092"topics => "filebeat-log"codec => "json"} } filter {if "nginx-access" in [tags] {geoip {source => "clientip"target => "geoip"add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lon]}"]add_field => ["[geoip][coordinates]","%{[geoip][geo][location][lat]}"]}mutate {convert => [ "[geoip][coordinates]", "float"]}}mutate {convert => ["upstreamtime","float"]} } output {if "nginx-access" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}"}jdbc { #追加mysql写入connection_string => "jdbc:mysql://10.0.0.155/elk?user=elk&password=123456&useUnicode=true&characterEncoding=UTF8"statement => ["INSERT INTO elklog (clientip,responsetime,uri,status) VALUES(?,?,?,?)","clientip","responsetime","uri","status"]}} if "nginx-error" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}"}} if "syslog" in [tags] {elasticsearch {hosts => ["10.0.0.151:9200"]index => "logstash-kafka-syslog-%{+YYYY.MM.dd}"}} } #启动测试 [root@logstash ~]#logstash -f /etc/logstash/conf.d/kafka-to-es.conf#filebeat端生成一些数据做测试 [root@ubuntu ~]#head -n11 access_json.log-20230221.0 >> /var/log/nginx/access_json.log #查看mysql [root@ubuntu ~]#mysql mysql> use elk #出现数据 mysql> select * from elklog; +----------------+--------------+---------------+--------+---------------------+ | clientip | responsetime | uri | status | time | +----------------+--------------+---------------+--------+---------------------+ | 20.203.179.186 | 0.000 | /.git/config | 302 | 2024-10-21 10:19:40 | | 20.203.179.186 | 0.000 | /.git/config | 302 | 2024-10-21 10:19:40 | | 218.94.106.239 | 0.000 | / | 302 | 2024-10-21 10:19:40 |
最后这个kibana谁都能打开不安全,可以用nginx反向代理,用它的basic账号验证功能
[root@ubuntu ~]#apt install nginx [root@ubuntu ~]#vim /etc/nginx/sites-enabled/default location / {#try_files $uri $uri/ =404;proxy_pass http://127.0.0.1:5601;auth_basic "kibana site"; #提示auth_basic_user_file conf.d/htpasswd; #放密码的文件 } #生成密码文件 [root@ubuntu ~]#apt install apache2-utils [root@ubuntu ~]#htpasswd -c -b /etc/nginx/conf.d/htpasswd xiaoming 123456 #还有其他人就再创建,第二次不能加-c [root@ubuntu ~]#htpasswd -b /etc/nginx/conf.d/htpasswd xiaohong 123456#浏览器输入, 即可验证登录 http://10.0.0.154/#把kibana的配置允许远程访问改了 [root@ubuntu ~]#vim /etc/kibana/kibana.yml #server.host: "0.0.0.0" #注释掉,默认为localhost [root@ubuntu ~]#systemctl restart kibana.service