背景介绍
上一篇文章简单介绍了我在德X科技的时候采用的日志处理方案, 这种方案更多的是用于开发者调试错误日志, 和一些简单的分析的工作, 大数据部分和我们这边的日志采集使用的是同一套, 都是从filebeat采集, 只是入库到不同的kafka而已. 今天我就用nginx日志来梳理一下ELK的简单使用方式.
日志格式
在日志采集入库阶段,规范的日志格式至关重要,它直接影响后续数据分析的效率,后面数据分析的同事就会很头疼。我曾经在使用 Logstash 解析 Nginx 日志时就遇到过格式带来的问题。虽然 Nginx 日志本身格式规范,但分隔符的选择仍需谨慎。- 和 | 等常见分隔符如果出现在用户访问的 URL 中,会导致解析错误。为了避免这种情况,建议使用不可见字符作为分隔符,例如 ^A ,这个字符在终端中输入的方式为 Ctrl+V+A ,在 Logstash 的配置中,对应的分隔符应写为 \u0001.
生产环境nginx日志格式配置(注意不能直接复制这个粘贴到你的nginx配置文件中, 必须手工敲出来):
log_format main '$http_host^A$remote_addr^A$time_local^A$request_time^A$request^A$status^A$body_bytes_sent^A$http_referer^A$http_user_agent^A$request_body';
nginx常用字段含义:
参数 | 说明 | 示例 |
---|---|---|
$http_host | 用户请求的主机名 | www.google.com |
$remote_addr | 用户的IP地址 | 1.1.1.1 |
$http_x_forwarded_for | 用户的IP地址(当nginx前面挂了代理的时候) | 12.4.5.16 |
$time_local | 日志打印的时间 | 09/Feb/2018:16:11:03 +0800 |
$request_time | 服务端的处理耗时(从nginx接收到第一个byte开始计算) | 0.01 |
$request | 用户请求的方法以及路径参数等 | GET /cc/checkcookie?type=cookie&ctime=0.2123393398508435 HTTP/1.1 |
$status | HTTP状态码 | 200 |
$body_bytes_sent | 返回给用户的HTTP body大小(byte) | 2956 |
$http_referer | 源跳转地址(从哪儿跳转过来的) | http://www.baidu.com |
$http_user_agent | 用户的UA | curl/7.54.0 |
logstash从kafka中获取到nginx日之后会去解析每一个上面的字段, 核心配置如下:
filter {
grok {
match => { "message" => "%{IPORHOST:http_host}\u0001%{IP:client_ip}\u0001%{HTTPDATE:timestamp}\u0001%{NUMBER:request_ti
me}\u0001%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\u0001%{NUMBER:status}\u0001%{NUMBER:body_length}\u0001%{DATA:r
eferer}\u0001%{DATA:ua}\u0001%{DATA:request_body}"}
}
}
日志采集
数据分析的第一步肯定是要获取日志, 之前一般采用的架构是采用每一台应用服务器上部署一个Logstash来采集日志, 后来由于Logstash太占用资源, 官方又使用go语言开发了一个更加轻量专一的日志采集工具--filebeat
, 为了方便管理, 我们统一采用supervisor来监控filebeat的运行, supervisor采用系统安装的方法(yum或者apt-get), filebeat会放置在/usr/local/filebeat
目录下, 下面是supervisor的配置文件:
filebeat.ini
[program:filebeat]
command=/usr/local/filebeat/filebeat -e -c /usr/local/filebeat/conf/filebeat.yml
autostart=true
autorestart=true
redirect_stderr = true
stdout_logfile=/var/log/filebeat_nginx_stdout.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=3
stderr_logfile=/var/log/filebeat_nginx_stderr.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3
filebeat(6.5)配置文件如下:
filebeat.yml
#=========================== Filebeat prospectors =============================
filebeat.prospectors:
- type: log
# Change to true to enable this prospector configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /home/test/logs/access_log
#- c:\programdata\elasticsearch\logs\*
#==================== Elasticsearch template setting ==========================
setup.template.settings:
index.number_of_shards: 3
#index.codec: best_compression
#_source.enabled: false
#================================ Out =====================================
output.kafka:
hosts: ["192.168.1.1:19092","192.168.1.2:19092","192.168.1.3:19092"]
topic: "mp_service_www_douyin_com"
required_acks: 1
compression: gzip
max_message_bytes: 1000000
根据上面的配置后, filebeat就会向我们搭建的kafka集群中上传日志了, 到此filebeat部分配置完毕.
kafka
kafka是一种分布式的消息系统, 一般在大数据处理上当做发布订阅系统. 这里只是简单介绍几个重要的概念和常用的命令, 够用就行, 等工作中遇到了其它问题后再总结一下吧.
- 创建topic
./kafka-topics.sh --create --zookeeper 192.168.1.2:2181,192.168.1.9:2181,192.168.1.10:2181 --partitions 3 --topic mp_service_www_douyin_com
这样就会创建一个名为mp_service_www_douyin_com的topic.
Logstash配置
日志经过filebeat传输到kafka后会持续存储一段时间, 我们配置了Logstash来消费这些日志, 我们同样使用supervisor来监控.
supervisor配置文件:
[program:www_douyin_com]
command = /data/logstash/bin/logstash -f /data/logstash/conf/kafka_mp_service_www_douyin_com.conf --path.data /data/logstash/data/mp_service_www_douyin_com
environment = JAVA_HOME = /usr/local/jdk
autostart = true
autorestart = true
logstash规则文件:
input {
kafka {
bootstrap_servers => "192.168.1.1:19092,192.168.1.2:19092,192.168.1.3:19092"
auto_offset_reset => "latest"
decorate_events => true
consumer_threads => 2
topics => ["mp_service_www_douyin_com"]
type => "kafka-input"
codec => "json"
}
}
filter {
grok {
match => { "message" => "%{IPORHOST:http_host}\u0001%{IP:client_ip}\u0001%{HTTPDATE:timestamp}\u0001%{NUMBER:request_time}\u0001%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\u0001%{NUMBER:status}\u0001%{NUMBER:body_length}\u0001%{DATA:referer}\u0001%{DATA:ua}\u0001%{DATA:request_body}"}
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
}
output {
elasticsearch {
hosts => ["192.168.1.215:9200"]
codec => "json"
index => "mp-nginx-www_douyin_com-%{+YYYY.MM.dd}"
}
}
这里有一点需要注意, filter中的date, 是为了在Kibana查看的时候使用nginx中的日志时间戳作为timestamp而不是入库的时间戳. 上面的match语法是grok语法, 简单来说就是Logstash本身自定义了许多正则表达式, 我们只需要用就行了, 比如%{IP:client_ip}
经过解析后就会把IP地址给client_ip字段, 我们就可以使用Kibana过滤该字段了. 具体可以参考西面的链接. 到这里我们的日志已经进入了ES了, 索引名字的格式就是output定义的index. 下一步就可以查询ES或者使用Kibana创建我们的表格了.
使用Kibana
日志进入到ES后就可以从Kibana上添加索引并查看日志和做一些自己需要的图表.
经过上面的步骤就可以通过不同的索引查看日志了, 具体作图等实践以后再更新知识库.