ThankNeko's Blog ThankNeko's Blog
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)

Hoshinozora

尽人事,听天命。
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)
  • 操作系统

  • 虚拟化服务

  • 数据库服务

    • MySQL笔记

    • ELK笔记

      • ELK介绍和ES部署
      • ES相关概念与工作原理
      • ES相关操作
      • Logstash和Kibana
        • Logstash介绍
          • EVENT
          • INPUT
          • FILTERS
          • OUTPUT
          • CODEC
        • Logstash部署
          • 1. 部署JDK环境
          • 2. 安装logstash
          • 3. 测试启动logstash
          • 4. Logstash简单示例
        • Logstash配置文件
          • 环境配置文件
          • 管道配置文件
        • Kibana介绍
        • Kibana部署
          • 1. 安装Kibana
          • 2. 配置kibana
          • 3. 启动kibana
        • Kibana使用
          • Kibana URL
          • 数据视图
    • Redis笔记

    • MongoDB笔记

    • Git笔记

  • 监控服务

  • Web服务

  • 数据处理

  • Ops
  • 数据库服务
  • ELK笔记
Hoshinozora
2023-03-24
目录

Logstash和Kibana

# Logstash介绍

Logstash能够同时从多个来源采集数据、转换数据,然后将数据发送到你所指定的"存储方式"中,同时它也支持在转发的过程中过滤数据。另外logstash还支持多实例启动,以达到不同logstash实例管理不同的转发。

# EVENT

  • Logstash的事件,logstash将数据流中等每一条数据称之为一个event。
  • event的处理流水线由三个主要角色完成:inputs –> filters –> outputs。

# INPUT

  • 输入插件 (必须),负责产生事件,插件使Logstash能够读取特定的事件源。
  • INPUT支持的事件源
    • azure_event_hubs - 微软云事件中心
    • beats - filebeat日志收集工具
    • elasticsearch - 搜索引擎数据库
    • file - 文件
    • generator - 生成器
    • heartbeat - 高可用软件
    • http_poller - http api
    • jdbc - java连接数据库的驱动
    • kafka - 基于java的消息队列
    • rabbitmq - 消息队列
    • redis - 缓存、消息队列、NoSQL
    • stdin - 标准输入
    • syslog - 系统日志
    • tcp - 传输控制协议
    • udp - 用户数据报协议
    • s3 - 对象存储

# FILTERS

  • 过滤器(可选),filters负责数据处理与转换。

# OUTPUT

  • 输出插件(必须),负责数据输出,插件将事件数据发送到特定的目的地,OUTPUT是事件流水线中的最后阶段。
  • OUTPUT支持的输出源
    • elasticsearch - 搜索引擎数据库
    • email - 邮件
    • file - 文件
    • http - 超文本传输协议
    • kafka - 基于java的消息队列
    • rabbitmq - 消息队列
    • redis - 缓存、消息队列、NoSQL
    • stdout - 标准输出
    • tcp - 传输控制协议
    • udp - 用户数据报协议
    • s3 - 对象存储

# CODEC

  • 编解码插件(可选),CODEC用于格式化输出,默认输出为json格式。
  • CODEC编解码器支持的编码
    • avro - 数据序列化
    • CEF - 嵌入式框架
    • es_bulk - ES中的bulk api
    • json - 数据序列化、格式化
    • json_lines - 便于存储结构化
    • line - 行
    • multiline - 多行匹配
    • plain - 纯文本
    • rubydebug - ruby语法格式

# Logstash部署

# 1. 部署JDK环境

  • Logstash依赖于Java环境运行,所以需要先安装Java环境。
# 下载JDK二进制包并上传到服务器
https://www.oracle.com/java/technologies/downloads/archive/

# 解压到指定目录
mkdir -p /data/lib/jdk
tar -xvf ./jdk-*.tar.gz -C /data/lib/jdk

# 创建软链接 (方便更新版本)
ln -s /data/lib/jdk/jdk* /data/lib/jdk/jdk

# 配置JAVA环境变量
cat >>/etc/profile<<'EOF'
export JAVA_HOME=/data/lib/jdk/jdk
export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar
EOF
source /etc/profile

# 验证是否生效
java -version
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 2. 安装logstash

  • 二进制包安装

    # 下载Logstash的linux-x86-64包
    https://www.elastic.co/cn/downloads/logstash
    
    # 解压logstash
    tar -xvf logstash-*-linux-x86_64.tar.gz -C /data/apps/
    
    # 移动到软件目录
    mv /data/apps/logstash-* /data/apps/logstash/
    
    # 添加Logstash二进制文件软链接到bin
    ln -s /data/apps/logstash/bin/logstash /bin/logstash
    ln -s /data/apps/logstash/bin/logstash-keystore /bin/logstash-keystore
    ln -s /data/apps/logstash/bin/logstash-plugin /bin/logstash-plugin
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

# 3. 测试启动logstash

  • logstash -e 'input {stdin {}} output {stdout{}}'
    • 表示启动一个logstash实例,将标准输入转发输出到标准输出。

# 4. Logstash简单示例

# 标准输入转发输出到标准输出
logstash -e 'input {stdin {}} output {stdout{}}'

# 输出时指定codec格式:
logstash -e 'input {stdin {}} output {stdout{ codec => rubydebug }}'

# 标准输入转发输出到文件:
logstash -e 'input {stdin {}} output { file {path => "/var/log/logstash_stdin.log" codec => rubydebug} }'

# 标准输入转发输出到ES:
logstash -e 'input {stdin {}} output { elasticsearch { hosts => ["192.168.10.80:9200"] index => "stdin" }}'
1
2
3
4
5
6
7
8
9
10
11

# Logstash配置文件

Logstash有两种类型的配置文件:环境配置文件,用于配置Logstash本身的各参数。以及管道配置文件,用于定义Pipeline处理管道。

# 环境配置文件

  • ./logstash/config/logstash.yml - Logstash配置文件,用于控制Logstash本身的启动和运行。

  • ./logstash/config/pipelines.yml - 主管道配置文件,用于控制Logstash实例运行多管道的框架和配置。

  • ./logstash/config/jvm.options - JVM启动参数文件。

# pipelines.yml - 主管道配置

  • pipeline.id - 管道的ID
  • pipeline.workers - 在管道Filter和Output阶段中允许并行的worker数量。
    • 默认值为主机的CPU核数。
  • pipeline.batch.size - 单个worker执行Filter和Output的事件数量阈值。
    • 只有从Input接收到的事件数量达到指定阈值时,才会交给Filter和Output处理。
    • 默认大小为125,更大的batch size意味着更高的批处理效率,但是同时也会增加内存开销。一般还需要同时调整jvm.options中的堆内存大小配置。
  • pipeline.batch.delay - 单个worker在接收事件后等待新事件的最长时间阈值,以毫秒为单位。
    • 如果超过最长等待时间就会直接交给Filter和Output处理。也就是说如果事件数量阈值不满足时,就会根据该参数的设定,超时便执行Filter和Output操作。

# 管道配置文件

  • 管道配置文件是实际用于定义管道各插件的配置文件,一般以.conf结尾。
  • 执行logstash命令运行实例,默认会使用主管道配置文件中的配置,所以如果需要单实例运行多管道,则需要对pipelines.yml进行配置。
  • 我们使用Logstash的-f参数可以指定只使用单个管道配置文件来运行实例。
    • 例如:logstash -f /data/apps/logstash/conf.d/stdin.conf

# 管道配置格式

  • 管道配置文件中定义的实际就是各种插件的调用,我们可以将其分为管道插件和普通插件。

  • 管道插件实际就是管道级别的插件也就是input、filter、output。而普通插件就是用来定义管道插件的具体实现细节的,管道插件名中可以定义多个普通插件。

  • 管道插件一般没有参数,我们只在其中定义各种普通插件即可,普通插件的参数格式是:参数名 => 值。

  • 插件定义方式:

    # 格式:
    管道插件名 {
    	普通插件名 {
    		参数名 => 值
    		参数名 => 普通插件名 {
    			参数名 => 值
    		}
    		...
    	}
    	...
    }
    ...
    
    # 例子:
    input {
    	stdin {
    		codec => multiline {
    			pattern => "^\["
    			negate => true
    			what => "previous"
    		}
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

# 获取事件数据

  • Logstash中的所有事件都有属性,如Nginx日志中会有状态码、请求地址、请求方式等,这些实际就是属性,Logstash将这些属性称为字段(Fields)。
  • 字段引用
    • Logstash中只有Filter和Output才能引用字段,Input只负责事件输入。
    • 引用字段直接通过[]包裹字段名来引用即可,例如:[字段名]
    • 嵌套字段则指定字段的完整路径来引用即可,例如:[顶层字段][嵌套字段]

# 条件判断格式

  • Logstash中的条件语句的使用方式和作用与编程语言中的相同。条件语句支持if、else if、else语句,并且可以嵌套。

  • Logstash的表达式支持:

    • 比较运算符:==、!=、<、>、<=、>=
    • 正则表达式:=~、!~
    • 包含关系:in、not in
    • 布尔运算:and、or、nand、xor
    • 否定运算符:!
  • 条件判断定义方式:

    # 格式:
    if EXPRESSION {
        插件名 {
            ...
        }
    } else if EXPRESSION {
      ...
    } else {
      ...
    }
    
    # 例子:
    if [age] > 35 {
        stdout {
            ...
        }
        ...
    } else if [age] < 18 {
        ...
    } else {
        ...
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

# file模块

  • file模块用于从文件收集数据、也可以用于输出数据到文件,Logstash对被收集的日志文件需要有读权限,对目标输出文件需要有写权限。

  • 作为输入源配置

    # 一个input中可以指定多个收集源
    input {
    	file {
    		# 日志类型 (可选,但一般填收集的日志名,会在输出信息中展示出来)
    		type => "message-log"
    
    		# 日志路径
    		path => "/var/log/messages"
    
    		# beginning:从文件头开始收集,end:从文件尾开始收集
    		start_position => "beginning"
    
    		# 收集日志间隔时间(秒)
    		stat_interval => "3"
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
  • 作为输出源配置

    output {
    	# 我们还可以判断type字段输出到不同文件
    	if [type] == "message-log" {
    
    		#文件模块
    		file {
    			#输出路径
    			path => "/tmp/message_%{+yyyy.MM.dd}.log"
    		}
    
    	}
    	else if [type] == "secure-log" {
    	
    		file {
    			path => "/tmp/secure_%{+yyyy.MM.dd}.log"
    			}
    
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19

# json模块

  • json模块可以用于解析json格式的数据。

  • 如果我们用kibana来实现可视化,那么就必须用json键值对的形式输出数据,不仅源日志需要是json格式,还需要在配置中添加使用filter来将json日志解析成键值对,然后还需要在output指定输出格式为json格式。

  • 使用例子:

    input {
      file {
        path => "/var/log/json_log"
        start_position => "beginning"
      }
    }
    filter {
      json {
        # 指定json格式文本的字段
        source => "message"
      }
    }
    output {
      elasticsearch {
        hosts => ["192.168.10.80:9200"]
        index => "json_log"
        codec => "json"
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19

# multiline模块

  • multiline模块用于多行匹配,因为Logstash默认是按一行日志算一条事件的,这样某些换行信息就会被作为多条日志。查询时会非常不方便,所以可以使用multiline或模块来实现多行匹配。

  • 使用例子:

    input {
      stdin {
        codec => multiline {
        # 匹配"["开头的行,触发多行合并
        pattern => "^\["
        
        # true:匹配成功则进行合并,false:匹配不成功则进行合并
        negate => true
        
        #previou:与上面的行合并,next:与下面的行合并
        what => "previous"
        
        }
      }
    }
    output {
      stdout {
      codec => rubydebug
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    • 此处以"["开头,才会进行合并和收集,否者不会收集。

# date模块

  • date模块可用于以指定格式匹配日志中的时间。

  • 使用例子:

    # 格式:
    filter{
      date {
        match => ["字段名","时间格式"]
      }
    }
    
    # 例子:
    filter{
      date {
        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
  • 时间格式:

    image-20230324113253085

# grok模块

  • grok模块用于对日志进行正则匹配,并且会将正则匹配的结果,赋值给指定的字段名。

  • grok自定义正则匹配

    • 语法:(?<赋值字段名>正则表达式)

    • 使用例子:

      # 格式:
      filter {
      	grok {
      			match => { 
      				"文本源字段" => "(?<赋值字段名>正则表达式)" 
      		}
      	}
      }
      
      # 例子:
      filter {
      	grok {
      			match => {
      				# 截取message字段中的文本进行匹配,将Report之前的字符作为temMsg字段的值
      				"message" => "(?<temMsg>(.*)(?=Report)/?)" 
      		}
      	}
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
  • grok模式表达式匹配

    • 语法:%{模式类型:字段名}

    • 模式类型:

      • IP - 匹配IP地址,IPv4和IPv6都可以
      • MAC - 匹配MAC地址
      • DATA - 相当于正则的.*?
      • WORD - 匹配包含数字和大小写的字符串,相当于正则的\b\w+\b
      • INT - 匹配正负整数、零
      • NUMBER - 匹配十进制数,包括整数和小数
    • 使用例子:

      filter{
      	grok{
      		match => {"message" => "%{IP:ip}"}
      	}
      }
      
      1
      2
      3
      4
      5

# Kibana介绍

Kibana 是一款开源的数据分析和可视化平台,我们可以使用Kibana对ES索引中的数据进行搜索、查看、交互操作。它可以很方便的利用图表、表格及地图对数据进行多元化的分析和呈现。

# Kibana部署

# 1. 安装Kibana

  • Kibana官网没有提供二进制包安装,所以我们需要通过rpm包安装。

    # 官网下载rpm包
    https://www.elastic.co/cn/downloads/kibana
    
    # 使用yum安装kibana
    yum localinstall -y kibana-*.rpm
    
    1
    2
    3
    4
    5

# 2. 配置kibana

  • vim /etc/kibana/kibana.yml

    # 修改以下四行即可
    
    # kibana端口
    server.port: 5601
    
    # kibana白名单主机
    server.host: "0.0.0.0"
    
    # ES集群地址
    elasticsearch.hosts: ["http://192.168.10.11:9200","http://192.168.10.12:9200"]
    
    # kibana前端语言设置
    i18n.locale: "zh-CN"
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

# 3. 启动kibana

  • systemctl start kibana && systemctl enable kibana
  • 检查是否成功启动:netstat -lntup | grep 5601

# Kibana使用

# Kibana URL

  • http://192.168.10.11:5601/ - 主页
  • http://192.168.10.11:5601/status - 状态页,Kibana及ES集群状态

# 数据视图

  • 如果需要使用Discovery查询数据则需要先定义数据视图。
  • 数据视图就是某个或某些索引的映射,它支持使用通配符*来进行匹配。
  • 创建数据视图
    1. 侧边栏 -> Stack Management -> 数据视图 -> 创建数据视图。
    2. 输入索引的匹配规则,例如:nginx-*。
    3. 然后选择时间戳字段,例如:@timestamp,如果不设置时间戳字段,则无法在Discovery页面直接使用时间筛选的控件。
    4. 然后创建即可。
#数据库#ELK#ES#日志收集
ES相关操作
Redis部署与使用

← ES相关操作 Redis部署与使用→

最近更新
01
二〇二五年四月十七日随笔
04-17
02
二〇二五年四月十六日随笔
04-16
03
二〇二五年四月九日随笔
04-09
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Hoshinozora | MIT License
湘ICP备2022022820号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式