运维人

ELK+Kafka 企业日志收集平台(二)

上篇博文主要总结了一下elk、基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程。下面我们接着下面这个未完成的几个主题

4.Kibana部署;

5.Nginx负载均衡Kibana请求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana报表基本使用;

 

Kibana的部署;

Kibana的作用,想必大家都知道了就是一个展示工具,报表内容非常的丰富;

下面我们在两台es上面搭建两套kibana

1.获取kibana软件包

[root@es1 ~]# wget https://download.elastic.co/kibana/kibana/kibana-4.1.2-linux-x64.tar.gz
[root@es1 ~]# tar -xf kibana-4.2.0-linux-x64.tar.gz -C /usr/local/

2.修改配置文件

[root@es1 ~]# cd /usr/local/
[root@es1 local]# ln -sv kibana-4.1.2-linux-x64 kibana
`kibana' -> `kibana-4.2.0-linux-x64'
[root@es1 local]# cd kibana

[root@es1 kibana]# vim config/kibana.yml
server.port: 5601      #默认端口可以修改的
server.host: "0.0.0.0" #kibana监听的ip
elasticsearch.url: "http://localhost:9200" #由于es在本地主机上面,所以这个选项打开注释即可

3.提供kibana服务管理脚本,我这里写了个相对简单的脚本

[root@es1 config]# cat /etc/init.d/kibana
#!/bin/bash
#chkconfig: 2345 55 24
#description: kibana service manager

KIBBIN='/usr/local/kibana/bin/kibana'
LOCK='/usr/local/kibana/locks'

START() {
	if [ -f $LOCK ];then
		echo -e "kibana is already \033[32mrunning\033[0m, do nothing."
	else
		echo -e "Start kibana service.\033[32mdone\033[m"
		cd  /usr/local/kibana/bin
    	nohup ./kibana & >/dev/null
 		touch $LOCK
	fi
}

STOP() {
	if [ ! -f $LOCK ];then
		echo -e "kibana is already stop, do nothing."
	else
		echo -e "Stop kibana serivce \033[32mdone\033[m"
		rm -rf $LOCK
		ps -ef | grep kibana | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null
	fi
}

STATUS() {
        Port=$(netstat -tunl | grep ":5602")
	if [ "$Port" != "" ] && [ -f $LOCK ];then
		echo -e "kibana is: \033[32mrunning\033[0m..."
	else
		echo -e "kibana is: \033[31mstopped\033[0m..."
	fi
}

case "$1" in
  start)
	START
	;;
  stop)
	STOP
	;;
  status)
	STATUS
	;;
  restart)
	STOP 
    sleep 2
    START
	;;
  *)
	echo "Usage: /etc/init.d/kibana (|start|stop|status|restart)"
	;;
esac

4.启动kibana服务

[root@es1 config]# chkconfig --add kibana
[root@es1 config]# service kibana start
Start kibana service.done
[root@es1 config]#

5.服务检查

[root@es1 config]# ss -tunl | grep "5601"
tcp    LISTEN     0      511                    *:5601                  *:*     
[root@es1 config]#

ok,此时我直接访问es1这台主机的5601端口

11

ok,能成功的访问5601端口,那我把es1这台的配置放到es2上面去然后启动,效果跟访问es1一样

Nginx负载均衡kibana的请求

1.在nginx-proxy上面yum安装nginx

yum install -y nignx

2.编写配置文件es.conf

[root@saltstack-node1 conf.d]# pwd 
/etc/nginx/conf.d
[root@saltstack-node1 conf.d]# cat es.conf 
upstream es {
    server 192.168.2.18:5601 max_fails=3 fail_timeout=30s;
    server 192.168.2.19:5601 max_fails=3 fail_timeout=30s;
}

server {
    listen       80;
    server_name  localhost;

    location / {
        proxy_pass http://es/;
        index index.html index.htm;
        #auth
        auth_basic "ELK Private";
        auth_basic_user_file /etc/nginx/.htpasswd;
    }

 }

3.创建认证

[root@saltstack-node1 conf.d]# htpasswd -cm /etc/nginx/.htpasswd elk
New password: 
Re-type new password: 
Adding password for user elk-user
[root@saltstack-node1 conf.d]# /etc/init.d/nginx restart
Stopping nginx:                                            [  OK  ]
Starting nginx:                                            [  OK  ]
[root@saltstack-node1 conf.d]# 

4.直接输入认证用户及密码就可访问啦http://192.168.2.21/

22

Nginx及MySQL慢日志收集

首先我们在webserver1上面都分别安装了nginx 及mysql.

1.为了方便nginx日志的统计搜索,这里设置nginx访问日志格式为json

(1)修改nginx主配置文件

说明:如果想实现日志的报表展示,最好将业务日志直接以json格式输出,这样可以极大减轻cpu负载,也省得运维需要写负载的filter过滤正则。

[root@webserver1 nginx]# vim nginx.conf
log_format json '{"@timestamp":"$time_iso8601",'
                '"@version":"1",'
                '"client":"$remote_addr",'
                '"url":"$uri",'
                '"status":"$status",'
                '"domain":"$host",'
                '"host":"$server_addr",'
                '"size":$body_bytes_sent,'
                '"responsetime":$request_time,'
                '"referer": "$http_referer",'
                '"ua": "$http_user_agent"'
                '}';
  access_log  /var/log/access_json.log  json;

(2)收集nginx日志和MySQL日志到消息队列中;这个文件我们是定义在客户端,即生产服务器上面的Logstash文件哦.

注意:这里刚搭建完毕,没有什么数据,为了展示效果,我这里导入了线上的nginx和MySQL慢日志

input {
  file {             #从nginx日志读入
    type => "nginx-access"
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    codec => "json"  #这里指定 codec格式为json
  }
  file {  #从MySQL慢日志读入
   type => "slow-mysql"
   path => "/var/log/mysql/slow-mysql.log"
   start_position => "beginning"
   codec => multiline {         #这里用到了logstash的插件功能,将本来属于一行的多行日志条目整合在一起,让他属于一条   
     pattern => "^# User@Host"  #用到了正则去匹配
     negate => true
     what => "previous"
   }
  }
}

output {   
#  stdout { codec=> rubydebug }
  if [type] == "nginx-access" {    #通过判断input中定义的type,来让它在kafka集群中生成的主题名称
    kafka {                        #输出到kafka集群
      bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092"  #生产者们
      topic_id => "nginx-access"   #主题名称
      compression_type => "snappy" #压缩类型
    }
 }
  if [type] == "slow-mysql" {
    kafka {
      bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092"
      topic_id => "slow-mysql"
      compression_type => "snappy"
    }
 }
}

(3)Logstash 从kafka集群中读取日志存储到es中,这里的定义logstash文件是在三台kafka服务器上面的哦,并且要保持一致,你可以在一台上面修改测试好之后,拷贝至另外两台即可。

input {
    kafka {
        zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"
        type => "nginx-access"
        topic_id => "nginx-access"
        codec => plain
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
    }
    kafka {
        zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"
        type => "slow-mysql"
        topic_id => "slow-mysql"
        codec => plain
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
    }
}

output {
#  stdout { codec=> rubydebug }
  if [type] == "nginx-access" {
    elasticsearch {
      hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
      index => "nginx-access-%{+YYYY-MM}"
    }
  }
  if [type] == "slow-mysql" {
    elasticsearch {
      hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
      index => "slow-mysql-%{+YYYY-MM}"
    }
  }
}

13

通过上图可以看到,nginx日志以及MySQL慢日志已经成功抵达es集群

然后我们在kibana上面创建索引就可以啦

(4)创建nginx-access 日志索引
11

此时就可以看到索引啦

16

(5)创建MySQL慢日志索引

p15

MySQL的索引也出来啦

17Kibana报表展示

kibana报表功能非常的强大,也就是可视化;可以制作出下面不同类型的图形

18

下面就是我简单的一些图形展示

19

由于篇幅问题,可以看官方介绍。

参考:

https://github.com/liquanzhou/ops_doc/tree/master/Service/kafka

http://www.lujinhong.com/kafka%E9%9B%86%E7%BE%A4%E6%93%8D%E4%BD%9C%E6%8C%87%E5%8D%97.html

http://www.it165.net/admin/html/201405/3192.html

http://blog.csdn.net/lizhitao/article/details/39499283

https://taoistwar.gitbooks.io/spark-operationand-maintenance-management/content/spark_relate_software/zookeeper_install.html

    分享到:
码字很辛苦,转载请注明来自运维人《ELK+Kafka 企业日志收集平台(二)》

评论

  1. kindone #1

    文章写的的确不错,准备借鉴一下

    回复
    2018-04-19
  2. kindone #2

    👿

    回复
    2018-04-19
  3. eric #3

    Hi,Daemon,你好!
    我已经测试到最后一步, 通过webserver的logstash传输到kafka,kafka通过topic list查询到有nginx-access和slow-mysql这两个topic,但是es接收不到信息,该如何排查呢?
    望告知,非常感谢!

    回复
    2016-08-4
  4. 小弟 #4

    博主为啥要加一层kafka,不直接从logstash发给es

    回复
    2016-02-19
    • Daemon

      kafka是数据展示。es是数据收集,存放,分析。logstash是获取我们server上的日志或者是一切能够输入输出的内容。

      回复
      2016-02-24
  5. 孙高猛 #5

    ❓ 你好,很感谢你提供了这些资料,让困扰了很多天的我得到解决
    但有个问题想咨询下你,logstash 将kafka的消息输出至es,在启动logstash 后cpu一直100% 高居不下,topic中并没有多少消息在跑,在其他机器上试了也一样效果
    我采用的是阿里云的云服务器,1核CPU

    回复
    2016-01-5
    • Daemon

      嗯,不客气.
      注意logstash非常占用java内存; 建议升级一下配置.

      回复
      2016-01-6
    • lsw

      logstash刚刚启动cpu都会比较高的,运行几分钟后就恢复正常

      回复
      2016-10-19