Logstash和Kibana
# Logstash介绍
Logstash能够同时从多个来源采集数据、转换数据,然后将数据发送到你所指定的"存储方式"中,同时它也支持在转发的过程中过滤数据。另外logstash还支持多实例启动,以达到不同logstash实例管理不同的转发。
# EVENT
- Logstash的事件,logstash将数据流中等每一条数据称之为一个event。
- event的处理流水线由三个主要角色完成:inputs –> filters –> outputs。
# INPUT
- 输入插件 (必须),负责产生事件,插件使Logstash能够读取特定的事件源。
- INPUT支持的事件源
azure_event_hubs
- 微软云事件中心beats
- filebeat日志收集工具elasticsearch
- 搜索引擎数据库file
- 文件generator
- 生成器heartbeat
- 高可用软件http_poller
- http apijdbc
- java连接数据库的驱动kafka
- 基于java的消息队列rabbitmq
- 消息队列redis
- 缓存、消息队列、NoSQLstdin
- 标准输入syslog
- 系统日志tcp
- 传输控制协议udp
- 用户数据报协议s3
- 对象存储
# FILTERS
- 过滤器(可选),filters负责数据处理与转换。
# OUTPUT
- 输出插件(必须),负责数据输出,插件将事件数据发送到特定的目的地,OUTPUT是事件流水线中的最后阶段。
- OUTPUT支持的输出源
elasticsearch
- 搜索引擎数据库email
- 邮件file
- 文件http
- 超文本传输协议kafka
- 基于java的消息队列rabbitmq
- 消息队列redis
- 缓存、消息队列、NoSQLstdout
- 标准输出tcp
- 传输控制协议udp
- 用户数据报协议s3
- 对象存储
# CODEC
- 编解码插件(可选),CODEC用于格式化输出,默认输出为json格式。
- CODEC编解码器支持的编码
avro
- 数据序列化CEF
- 嵌入式框架es_bulk
- ES中的bulk apijson
- 数据序列化、格式化json_lines
- 便于存储结构化line
- 行multiline
- 多行匹配plain
- 纯文本rubydebug
- ruby语法格式
# Logstash部署
# 1. 部署JDK环境
- Logstash依赖于Java环境运行,所以需要先安装Java环境。
# 下载JDK二进制包并上传到服务器
https://www.oracle.com/java/technologies/downloads/archive/
# 解压到指定目录
mkdir -p /data/lib/jdk
tar -xvf ./jdk-*.tar.gz -C /data/lib/jdk
# 创建软链接 (方便更新版本)
ln -s /data/lib/jdk/jdk* /data/lib/jdk/jdk
# 配置JAVA环境变量
cat >>/etc/profile<<'EOF'
export JAVA_HOME=/data/lib/jdk/jdk
export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar
EOF
source /etc/profile
# 验证是否生效
java -version
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 2. 安装logstash
二进制包安装
# 下载Logstash的linux-x86-64包 https://www.elastic.co/cn/downloads/logstash # 解压logstash tar -xvf logstash-*-linux-x86_64.tar.gz -C /data/apps/ # 移动到软件目录 mv /data/apps/logstash-* /data/apps/logstash/ # 添加Logstash二进制文件软链接到bin ln -s /data/apps/logstash/bin/logstash /bin/logstash ln -s /data/apps/logstash/bin/logstash-keystore /bin/logstash-keystore ln -s /data/apps/logstash/bin/logstash-plugin /bin/logstash-plugin
1
2
3
4
5
6
7
8
9
10
11
12
13
# 3. 测试启动logstash
logstash -e 'input {stdin {}} output {stdout{}}'
- 表示启动一个logstash实例,将标准输入转发输出到标准输出。
# 4. Logstash简单示例
# 标准输入转发输出到标准输出
logstash -e 'input {stdin {}} output {stdout{}}'
# 输出时指定codec格式:
logstash -e 'input {stdin {}} output {stdout{ codec => rubydebug }}'
# 标准输入转发输出到文件:
logstash -e 'input {stdin {}} output { file {path => "/var/log/logstash_stdin.log" codec => rubydebug} }'
# 标准输入转发输出到ES:
logstash -e 'input {stdin {}} output { elasticsearch { hosts => ["192.168.10.80:9200"] index => "stdin" }}'
2
3
4
5
6
7
8
9
10
11
# Logstash配置文件
Logstash有两种类型的配置文件:环境配置文件,用于配置Logstash本身的各参数。以及管道配置文件,用于定义Pipeline处理管道。
# 环境配置文件
./logstash/config/logstash.yml
- Logstash配置文件,用于控制Logstash本身的启动和运行。./logstash/config/pipelines.yml
- 主管道配置文件,用于控制Logstash实例运行多管道的框架和配置。./logstash/config/jvm.options
- JVM启动参数文件。
# pipelines.yml - 主管道配置
pipeline.id
- 管道的IDpipeline.workers
- 在管道Filter和Output阶段中允许并行的worker数量。- 默认值为主机的CPU核数。
pipeline.batch.size
- 单个worker执行Filter和Output的事件数量阈值。- 只有从Input接收到的事件数量达到指定阈值时,才会交给Filter和Output处理。
- 默认大小为125,更大的batch size意味着更高的批处理效率,但是同时也会增加内存开销。一般还需要同时调整
jvm.options
中的堆内存大小配置。
pipeline.batch.delay
- 单个worker在接收事件后等待新事件的最长时间阈值,以毫秒为单位。- 如果超过最长等待时间就会直接交给Filter和Output处理。也就是说如果事件数量阈值不满足时,就会根据该参数的设定,超时便执行Filter和Output操作。
# 管道配置文件
- 管道配置文件是实际用于定义管道各插件的配置文件,一般以
.conf
结尾。 - 执行logstash命令运行实例,默认会使用主管道配置文件中的配置,所以如果需要单实例运行多管道,则需要对
pipelines.yml
进行配置。 - 我们使用Logstash的
-f
参数可以指定只使用单个管道配置文件来运行实例。- 例如:
logstash -f /data/apps/logstash/conf.d/stdin.conf
- 例如:
# 管道配置格式
管道配置文件中定义的实际就是各种插件的调用,我们可以将其分为管道插件和普通插件。
管道插件实际就是管道级别的插件也就是input、filter、output。而普通插件就是用来定义管道插件的具体实现细节的,管道插件名中可以定义多个普通插件。
管道插件一般没有参数,我们只在其中定义各种普通插件即可,普通插件的参数格式是:
参数名 => 值
。插件定义方式:
# 格式: 管道插件名 { 普通插件名 { 参数名 => 值 参数名 => 普通插件名 { 参数名 => 值 } ... } ... } ... # 例子: input { stdin { codec => multiline { pattern => "^\[" negate => true what => "previous" } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 获取事件数据
- Logstash中的所有事件都有属性,如Nginx日志中会有状态码、请求地址、请求方式等,这些实际就是属性,Logstash将这些属性称为字段(Fields)。
- 字段引用
- Logstash中只有Filter和Output才能引用字段,Input只负责事件输入。
- 引用字段直接通过
[]
包裹字段名来引用即可,例如:[字段名]
- 嵌套字段则指定字段的完整路径来引用即可,例如:
[顶层字段][嵌套字段]
# 条件判断格式
Logstash中的条件语句的使用方式和作用与编程语言中的相同。条件语句支持
if、else if、else
语句,并且可以嵌套。Logstash的表达式支持:
- 比较运算符:
==
、!=
、<
、>
、<=
、>=
- 正则表达式:
=~
、!~
- 包含关系:
in
、not in
- 布尔运算:
and
、or
、nand
、xor
- 否定运算符:
!
- 比较运算符:
条件判断定义方式:
# 格式: if EXPRESSION { 插件名 { ... } } else if EXPRESSION { ... } else { ... } # 例子: if [age] > 35 { stdout { ... } ... } else if [age] < 18 { ... } else { ... }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# file模块
file模块用于从文件收集数据、也可以用于输出数据到文件,Logstash对被收集的日志文件需要有读权限,对目标输出文件需要有写权限。
作为输入源配置
# 一个input中可以指定多个收集源 input { file { # 日志类型 (可选,但一般填收集的日志名,会在输出信息中展示出来) type => "message-log" # 日志路径 path => "/var/log/messages" # beginning:从文件头开始收集,end:从文件尾开始收集 start_position => "beginning" # 收集日志间隔时间(秒) stat_interval => "3" } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16作为输出源配置
output { # 我们还可以判断type字段输出到不同文件 if [type] == "message-log" { #文件模块 file { #输出路径 path => "/tmp/message_%{+yyyy.MM.dd}.log" } } else if [type] == "secure-log" { file { path => "/tmp/secure_%{+yyyy.MM.dd}.log" } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# json模块
json模块可以用于解析json格式的数据。
如果我们用kibana来实现可视化,那么就必须用json键值对的形式输出数据,不仅源日志需要是json格式,还需要在配置中添加使用filter来将json日志解析成键值对,然后还需要在output指定输出格式为json格式。
使用例子:
input { file { path => "/var/log/json_log" start_position => "beginning" } } filter { json { # 指定json格式文本的字段 source => "message" } } output { elasticsearch { hosts => ["192.168.10.80:9200"] index => "json_log" codec => "json" } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# multiline模块
multiline模块用于多行匹配,因为Logstash默认是按一行日志算一条事件的,这样某些换行信息就会被作为多条日志。查询时会非常不方便,所以可以使用multiline或模块来实现多行匹配。
使用例子:
input { stdin { codec => multiline { # 匹配"["开头的行,触发多行合并 pattern => "^\[" # true:匹配成功则进行合并,false:匹配不成功则进行合并 negate => true #previou:与上面的行合并,next:与下面的行合并 what => "previous" } } } output { stdout { codec => rubydebug } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20- 此处以"["开头,才会进行合并和收集,否者不会收集。
# date模块
date模块可用于以指定格式匹配日志中的时间。
使用例子:
# 格式: filter{ date { match => ["字段名","时间格式"] } } # 例子: filter{ date { match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] } }
1
2
3
4
5
6
7
8
9
10
11
12
13时间格式:
# grok模块
grok模块用于对日志进行正则匹配,并且会将正则匹配的结果,赋值给指定的字段名。
grok自定义正则匹配
语法:
(?<赋值字段名>正则表达式)
使用例子:
# 格式: filter { grok { match => { "文本源字段" => "(?<赋值字段名>正则表达式)" } } } # 例子: filter { grok { match => { # 截取message字段中的文本进行匹配,将Report之前的字符作为temMsg字段的值 "message" => "(?<temMsg>(.*)(?=Report)/?)" } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
grok模式表达式匹配
语法:
%{模式类型:字段名}
模式类型:
IP
- 匹配IP地址,IPv4和IPv6都可以MAC
- 匹配MAC地址DATA
- 相当于正则的.*?
WORD
- 匹配包含数字和大小写的字符串,相当于正则的\b\w+\b
INT
- 匹配正负整数、零NUMBER
- 匹配十进制数,包括整数和小数
使用例子:
filter{ grok{ match => {"message" => "%{IP:ip}"} } }
1
2
3
4
5
# Kibana介绍
Kibana 是一款开源的数据分析和可视化平台,我们可以使用Kibana对ES索引中的数据进行搜索、查看、交互操作。它可以很方便的利用图表、表格及地图对数据进行多元化的分析和呈现。
# Kibana部署
# 1. 安装Kibana
Kibana官网没有提供二进制包安装,所以我们需要通过rpm包安装。
# 官网下载rpm包 https://www.elastic.co/cn/downloads/kibana # 使用yum安装kibana yum localinstall -y kibana-*.rpm
1
2
3
4
5
# 2. 配置kibana
vim /etc/kibana/kibana.yml
# 修改以下四行即可 # kibana端口 server.port: 5601 # kibana白名单主机 server.host: "0.0.0.0" # ES集群地址 elasticsearch.hosts: ["http://192.168.10.11:9200","http://192.168.10.12:9200"] # kibana前端语言设置 i18n.locale: "zh-CN"
1
2
3
4
5
6
7
8
9
10
11
12
13
# 3. 启动kibana
systemctl start kibana && systemctl enable kibana
- 检查是否成功启动:
netstat -lntup | grep 5601
# Kibana使用
# Kibana URL
http://192.168.10.11:5601/
- 主页http://192.168.10.11:5601/status
- 状态页,Kibana及ES集群状态
# 数据视图
- 如果需要使用Discovery查询数据则需要先定义数据视图。
- 数据视图就是某个或某些索引的映射,它支持使用通配符*来进行匹配。
- 创建数据视图
- 侧边栏 -> Stack Management -> 数据视图 -> 创建数据视图。
- 输入索引的匹配规则,例如:
nginx-*
。 - 然后选择时间戳字段,例如:
@timestamp
,如果不设置时间戳字段,则无法在Discovery页面直接使用时间筛选的控件。 - 然后创建即可。