Skip to content

背景介绍

 上一篇文章简单介绍了我在德X科技的时候采用的日志处理方案, 这种方案更多的是用于开发者调试错误日志, 和一些简单的分析的工作, 大数据部分和我们这边的日志采集使用的是同一套, 都是从filebeat采集, 只是入库到不同的kafka而已. 今天我就用nginx日志来梳理一下ELK的简单使用方式.

日志格式

 在日志采集入库阶段,规范的日志格式至关重要,它直接影响后续数据分析的效率,后面数据分析的同事就会很头疼。我曾经在使用 Logstash 解析 Nginx 日志时就遇到过格式带来的问题。虽然 Nginx 日志本身格式规范,但分隔符的选择仍需谨慎。- 和 | 等常见分隔符如果出现在用户访问的 URL 中,会导致解析错误。为了避免这种情况,建议使用不可见字符作为分隔符,例如 ^A ,这个字符在终端中输入的方式为 Ctrl+V+A ,在 Logstash 的配置中,对应的分隔符应写为 \u0001.

生产环境nginx日志格式配置(注意不能直接复制这个粘贴到你的nginx配置文件中, 必须手工敲出来):

log_format  main '$http_host^A$remote_addr^A$time_local^A$request_time^A$request^A$status^A$body_bytes_sent^A$http_referer^A$http_user_agent^A$request_body';

nginx常用字段含义:

参数说明示例
$http_host用户请求的主机名www.google.com
$remote_addr用户的IP地址1.1.1.1
$http_x_forwarded_for用户的IP地址(当nginx前面挂了代理的时候)12.4.5.16
$time_local日志打印的时间09/Feb/2018:16:11:03 +0800
$request_time服务端的处理耗时(从nginx接收到第一个byte开始计算)0.01
$request用户请求的方法以及路径参数等GET /cc/checkcookie?type=cookie&ctime=0.2123393398508435 HTTP/1.1
$statusHTTP状态码200
$body_bytes_sent返回给用户的HTTP body大小(byte)2956
$http_referer源跳转地址(从哪儿跳转过来的)http://www.baidu.com
$http_user_agent用户的UAcurl/7.54.0

logstash从kafka中获取到nginx日之后会去解析每一个上面的字段, 核心配置如下:

ruby
filter {
       grok {
		match => { "message" => "%{IPORHOST:http_host}\u0001%{IP:client_ip}\u0001%{HTTPDATE:timestamp}\u0001%{NUMBER:request_ti
me}\u0001%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\u0001%{NUMBER:status}\u0001%{NUMBER:body_length}\u0001%{DATA:r
eferer}\u0001%{DATA:ua}\u0001%{DATA:request_body}"}
       }
}

日志采集

 数据分析的第一步肯定是要获取日志, 之前一般采用的架构是采用每一台应用服务器上部署一个Logstash来采集日志, 后来由于Logstash太占用资源, 官方又使用go语言开发了一个更加轻量专一的日志采集工具--filebeat, 为了方便管理, 我们统一采用supervisor来监控filebeat的运行, supervisor采用系统安装的方法(yum或者apt-get), filebeat会放置在/usr/local/filebeat目录下, 下面是supervisor的配置文件:

python
filebeat.ini

[program:filebeat]
command=/usr/local/filebeat/filebeat -e -c /usr/local/filebeat/conf/filebeat.yml
autostart=true
autorestart=true
redirect_stderr = true
stdout_logfile=/var/log/filebeat_nginx_stdout.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=3
stderr_logfile=/var/log/filebeat_nginx_stderr.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3

filebeat(6.5)配置文件如下:

yaml
filebeat.yml
#=========================== Filebeat prospectors =============================

filebeat.prospectors:

- type: log

  # Change to true to enable this prospector configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /home/test/logs/access_log
    #- c:\programdata\elasticsearch\logs\*

#==================== Elasticsearch template setting ==========================

setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false

#================================ Out =====================================
output.kafka:
  hosts: ["192.168.1.1:19092","192.168.1.2:19092","192.168.1.3:19092"]
  topic: "mp_service_www_douyin_com"
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

根据上面的配置后, filebeat就会向我们搭建的kafka集群中上传日志了, 到此filebeat部分配置完毕.

kafka

kafka是一种分布式的消息系统, 一般在大数据处理上当做发布订阅系统. 这里只是简单介绍几个重要的概念和常用的命令, 够用就行, 等工作中遇到了其它问题后再总结一下吧.

  • 创建topic
bash
./kafka-topics.sh --create --zookeeper 192.168.1.2:2181,192.168.1.9:2181,192.168.1.10:2181 --partitions 3 --topic mp_service_www_douyin_com

这样就会创建一个名为mp_service_www_douyin_com的topic.

Logstash配置

日志经过filebeat传输到kafka后会持续存储一段时间, 我们配置了Logstash来消费这些日志, 我们同样使用supervisor来监控.

supervisor配置文件:

[program:www_douyin_com]
command = /data/logstash/bin/logstash -f /data/logstash/conf/kafka_mp_service_www_douyin_com.conf --path.data /data/logstash/data/mp_service_www_douyin_com
environment = JAVA_HOME = /usr/local/jdk
autostart = true
autorestart = true

logstash规则文件:

ruby

input {
        kafka {
                bootstrap_servers => "192.168.1.1:19092,192.168.1.2:19092,192.168.1.3:19092"
				auto_offset_reset => "latest"
                decorate_events => true
                consumer_threads => 2
				topics => ["mp_service_www_douyin_com"]
				type => "kafka-input"
                codec => "json"
        }
}

filter {
       grok {
		match => { "message" => "%{IPORHOST:http_host}\u0001%{IP:client_ip}\u0001%{HTTPDATE:timestamp}\u0001%{NUMBER:request_time}\u0001%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\u0001%{NUMBER:status}\u0001%{NUMBER:body_length}\u0001%{DATA:referer}\u0001%{DATA:ua}\u0001%{DATA:request_body}"}
       }

	date {
		match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
		remove_field => [ "timestamp" ]
	}
}

output {
        elasticsearch {
                hosts => ["192.168.1.215:9200"]
                codec => "json"
                index => "mp-nginx-www_douyin_com-%{+YYYY.MM.dd}"
        }
}

这里有一点需要注意, filter中的date, 是为了在Kibana查看的时候使用nginx中的日志时间戳作为timestamp而不是入库的时间戳. 上面的match语法是grok语法, 简单来说就是Logstash本身自定义了许多正则表达式, 我们只需要用就行了, 比如%{IP:client_ip}经过解析后就会把IP地址给client_ip字段, 我们就可以使用Kibana过滤该字段了. 具体可以参考西面的链接. 到这里我们的日志已经进入了ES了, 索引名字的格式就是output定义的index. 下一步就可以查询ES或者使用Kibana创建我们的表格了.

使用Kibana

日志进入到ES后就可以从Kibana上添加索引并查看日志和做一些自己需要的图表.

Kibana01.pngKibana02.png

经过上面的步骤就可以通过不同的索引查看日志了, 具体作图等实践以后再更新知识库.