logstash架构介绍
为什么需要logstash
对于部分生产上的日志无法像nginx
那样,可以直接将输出的日志转为json
格式,但是可以借助logstash
来将我们的非结构化数据转为结构化数据
filebeat
-->logstash
-->es
logstash(input fileter ouput)--es
什么是logstash
logstash
是开源的数据处理管道,能够同时从多个源采集数据,转换数据,然后输出数据
logstash架构介绍
logstash
的基础架构类似于pipeline
流水线,过下所示:input
:数据采集(常用插件:stdin、file、kafka、filebeat、http
)filter
:数据解析/转换(常用插件:grok、date、geoip、mutate、useragent
)output
:数据输出(常用插件:elasticresearch
)
安装logstash
yum install java -y
rpm -ivh logstash-7.8.1.rpm
grep "^[a-Z]" /etc/logstash/logstash.yml
node.name: logstash-node1
path.data: /var/lib/logstash
pipeline.workers: 4 # 和cpu核心一致
pipeline.batch.size: 1000
pipeline.ordered: auto
path.logs: /var/log/logstash
vim /etc/logstash/jvm.options
-Xms8g # 是内存的一半
-Xmx8g
logstash input插件
input
插件用于指定输入源,一个pipeline
可以有多个input
插件,我们主要围绕下面几个input
插件进行介绍:
stdin
file
filebeat
kafka
http
stdin插件
- 从标准输入读取数据,从标准输出中输出内容
cat /etc/logstash/conf.d/stdin.logstash.conf
input {
stdin {
type => "stdin"
tags => "stdin_type"
}
}
output {
stdout{
codec => "rubydebug"
}
}
- 执行测试
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/stdin.logstash.conf
- 返回结果
hello saturday
{
"type" => "stdin",
"@version" => "1",
"host" => "logstash",
"message" => "hello saturday",
"@timestamp" => 2024-08-02T14:08:32.334Z,
"tags" => [
[0] "stdin_type"
]
}
file插件
- 从
file
文件中读取数据,从标准输出中输出内容
cat /etc/logstash/conf.d/file.logstash.conf
input {
file {
path => "/var/log/test.log"
type => syslog
start_position => "beginning" # 第一次从头开始读取文件
stat_interval => "3" # 定时检查文件是否更新,默认1s
}
}
output {
stdout{
codec => "rubydebug"
}
}
-
执行测试
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/file.logstash.conf
-
返回结果
{
"host" => "logstash",
"message" => "Aug 3 09:10:50 logstash systemd: Started Session 86 of user root.",
"type" => "syslog",
"@version" => "1",
"@timestamp" => 2024-08-03T01:19:28.681Z,
"path" => "/var/log/test.log"
}
logstash filter插件
数据从源传输到存储的过程中,logstash
的filter
过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值;
- 利用
grok
从非结构化数据中派生出结构 - 利用
geoip
从ip
地址分析出地理坐标 - 利用
useragent
从请求中分析操作系统、设备类型
grok插件
- 我们希望将如下非结构化的数据解析成
json
结构化数据格式
10.168.99.6 - - [03/Aug/2024:09:51:18 +0800] "POST /wp-admin/admin-ajax.php HTTP/1.1" 200 58 "http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0" "-"
需要使用非常复杂的正则表达式
-
grok
其实是带有名字的正则表达式集合。grok
内置了很多pattern
可以直接使用; -
grok
示例:使用grok pattern
将nginx
日志格式转换为json
格式
cat /etc/logstash/conf.d/http.logstash.conf
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
output {
stdout{
codec => "rubydebug"
}
}
- 使用
insomnia post nginx
的非结构化日志数据到logstash
的5656测试端口
- 返回结果如下:
{
"httpversion" => "1.1",
"headers" => {
"http_host" => "192.168.99.21:5656",
"request_path" => "/",
"http_version" => "HTTP/1.1",
"content_length" => "291",
"http_accept" => "*/*",
"http_user_agent" => "insomnia/2021.6.0",
"request_method" => "POST"
},
"ident" => "-",
"message" => "10.168.99.6 - - [03/Aug/2024:09:51:18 +0800] \"POST /wp-admin/admin-ajax.p hp HTTP/1.1\" 200 58 \"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\" \"Mozi lla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Sa fari/537.36 Edg/126.0.0.0\" \"-\"",
"timestamp" => "03/Aug/2024:09:51:18 +0800",
"request" => "/wp-admin/admin-ajax.php",
"@version" => "1",
"verb" => "POST",
"bytes" => "58",
"auth" => "-",
"response" => "200",
"@timestamp" => 2024-08-03T02:22:25.201Z,
"referrer" => "\"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\"",
"host" => "10.168.99.6",
"clientip" => "10.168.99.6",
"agent" => "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, lik e Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0\""
}
geoip插件
-
geoip
插件:根据ip
地址提供的对应地域消息,比如经纬度、城市名等,方便进行地理数据分析; -
geoip
示例:通过geoip
提取nginx
日志中clientip
字段,并获取地域消息;
cat /etc/logstash/conf.d/http.logstash.conf
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
}
}
output {
stdout{
codec => "rubydebug"
}
}
geoip
结果:
"geoip" => {
"ip" => "222.217.160.1",
"region_name" => "Guangxi",
"continent_code" => "AS",
"timezone" => "Asia/Shanghai",
"country_code2" => "CN",
"country_name" => "China",
"latitude" => 22.8137,
"longitude" => 108.3146,
"region_code" => "GX",
"location" => {
"lon" => 108.3146,
"lat" => 22.8137
},
"country_code3" => "CN"
},
fields字段
- 由于输出内容太多,可以通过
fileds
选项选择自己需要的信息
cat /etc/logstash/conf.d/http.logstash.conf
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
fields => ["country_name","region_name"]
}
}
output {
stdout{
codec => "rubydebug"
}
}
- 返回结果:
"geoip" => {
"country_name" => "China",
"region_name" => "Guangxi"
},
date插件
-
date
插件:将日期字符串解析为日志类型。然后替换@timestamp
字段或指定的其他字段match
类型为数组,用于指定日期匹配的格式,可以以此指定多种日期格式target
类型为字符串,用于指定赋值的字段名,默认是@timestamp
timezone
类型为字符串,用于指定时区域
-
date
示例,将nginx
请求中的timestamp
日志进行解析
cat /etc/logstash/conf.d/http.logstash.conf
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
fields => ["country_name","region_name"]
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
output {
stdout{
codec => "rubydebug"
}
}
- 返回结果
{
"auth" => "-",
"ident" => "-",
"@timestamp" => 2024-08-03T01:51:18.000Z,
"timestamp" => "03/Aug/2024:09:51:18 +0800",
"referrer" => "\"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\"",
"httpversion" => "1.1",
"agent" => "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0\"",
"request" => "/wp-admin/admin-ajax.php",
"bytes" => "58",
"verb" => "POST",
"response" => "200",
"headers" => {
"http_version" => "HTTP/1.1",
"http_host" => "192.168.99.21:5656",
"request_method" => "POST",
"http_accept" => "*/*",
"content_length" => "293",
"http_user_agent" => "insomnia/2021.6.0",
"request_path" => "/"
},
useragent插件
-
useragent
插件:根据请求中的user-agent
字段,解析出浏览器设备、操作系统等信息 -
useragent
示例:
cat /etc/logstash/conf.d/http.logstash.conf
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
fields => ["country_name","region_name"]
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
useragent {
source => "agent"
target => "useragnet"
}
}
output {
stdout{
codec => "rubydebug"
}
}
- 返回结果:
"useragnet" => {
"major" => "126",
"os_name" => "Windows",
"minor" => "0",
"device" => "Other",
"build" => "",
"name" => "Chrome",
"os" => "Windows",
"patch" => "0"
},
mutate插件
mutate
主要对字段进行类型转换、删除、替换、更新等操作;remove_field
删除字段split
字符串切割add_field
添加字段convert
类型转换gsub
字符串替换rename
字段重命名
remove_field
mutate
删除无用字段,比如:headers、message、agent
cat /etc/logstash/conf.d/http.logstash.conf
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
fields => ["country_name","region_name"]
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
useragent {
source => "agent"
target => "useragnet"
}
mutate {
remove_field => ["headers","message","agent"]
}
}
output {
stdout{
codec => "rubydebug"
}
}
结果返回,整个数据返回的结果清爽了很多
{
"httpversion" => "1.1",
"ident" => "-",
"bytes" => "58",
"referrer" => "\"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\"",
"response" => "200",
"useragnet" => {
"build" => "",
"device" => "Other",
"patch" => "0",
"major" => "126",
"minor" => "0",
"name" => "Chrome",
"os" => "Windows",
"os_name" => "Windows"
},
"clientip" => "222.217.160.1",
"host" => "10.168.99.6",
"verb" => "POST",
"request" => "/wp-admin/admin-ajax.php",
"geoip" => {
"country_name" => "China",
"region_name" => "Guangxi"
},
"@timestamp" => 2024-08-03T01:51:18.000Z,
"@version" => "1",
"auth" => "-",
"timestamp" => "03/Aug/2024:09:51:18 +0800"
}
split
mutate
中的split
字符切割,指定|
为字段分隔符- 测试数据:
5607|提交订单|2019-12-28 03:18:31
cat mutate.logstash.conf
input {
http {
port => 5656
}
}
filter {
mutate {
split => { "message" => "|" }
}
}
output {
stdout {
codec => rubydebug
}
}
结果返回:
{
"message" => [
[0] "5607",
[1] "提交订单",
[2] "2019-12-28 03:18:31"
],
}
add_field
mutate
中add_field
,可以将分割后的数据创建出新的字段名称。便于以后的统计和分析;
cat mutate.logstash.conf
input {
http {
port => 5656
}
}
filter {
mutate {
split => { "message" => "|" }
add_field => {
"UserID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
}
}
output {
stdout {
codec => rubydebug
}
}
convert
mutate
中的convert
类型转换。支持转换integer、float、string
等类型
cat mutate.logstash.conf
input {
http {
port => 5656
}
}
filter {
mutate {
split => { "message" => "|" }
add_field => {
"UserID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
convert => {
"UserID" => "integer"
"Action" => "string"
"Date" => "string"
}
remove_field => ["headers","message"]
}
}
output {
stdout {
codec => rubydebug
}
}
logstash output插件
- stdout
- file
- elasticsearch
stdout插件
stdout
插件将数据输出到屏幕终端,便于调试;
output {
stdout {
codec => rubydebug
}
}
file插件
- 输出至文件,实现将分散在多地的文件统一到一处;
- 比如将所有
web
机器的web
日志收集到一个文件中,从而方便查阅信息
output {
file {
path => "/var/log/web.log"
}
}
elastic插件
- 输出到
elasticsearch
,是最常用的输出插件;
output {
elasticsearch {
index => "networks-h3c-%{+YYYY.MM.dd}"
hosts => ["192.168.99.11:9200","192.168.99.12:9200","192.168.99.13:9200"]
}
}
留言