内容目录

logstash架构介绍

为什么需要logstash

对于部分生产上的日志无法像nginx那样,可以直接将输出的日志转为json格式,但是可以借助logstash来将我们的非结构化数据转为结构化数据

filebeat-->logstash-->es
logstash(input fileter ouput)--es

什么是logstash

logstash是开源的数据处理管道,能够同时从多个源采集数据,转换数据,然后输出数据

logstash架构介绍

  • logstash的基础架构类似于pipeline流水线,过下所示:
    • input:数据采集(常用插件:stdin、file、kafka、filebeat、http
    • filter:数据解析/转换(常用插件:grok、date、geoip、mutate、useragent
    • output:数据输出(常用插件:elasticresearch

安装logstash

yum install java -y
rpm -ivh logstash-7.8.1.rpm

grep "^[a-Z]" /etc/logstash/logstash.yml
node.name: logstash-node1
path.data: /var/lib/logstash
pipeline.workers: 4  # 和cpu核心一致
pipeline.batch.size: 1000
pipeline.ordered: auto
path.logs: /var/log/logstash

vim /etc/logstash/jvm.options
-Xms8g  # 是内存的一半
-Xmx8g

logstash input插件

input插件用于指定输入源,一个pipeline可以有多个input插件,我们主要围绕下面几个input插件进行介绍:

  • stdin
  • file
  • filebeat
  • kafka
  • http

stdin插件

  • 从标准输入读取数据,从标准输出中输出内容
cat /etc/logstash/conf.d/stdin.logstash.conf

input {
        stdin {
                type => "stdin"
                tags => "stdin_type"
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}
  • 执行测试

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/stdin.logstash.conf

  • 返回结果
hello saturday
{
          "type" => "stdin",
      "@version" => "1",
          "host" => "logstash",
       "message" => "hello saturday",
    "@timestamp" => 2024-08-02T14:08:32.334Z,
          "tags" => [
        [0] "stdin_type"
    ]
}

file插件

  • file文件中读取数据,从标准输出中输出内容
cat /etc/logstash/conf.d/file.logstash.conf

input {
        file {
                path => "/var/log/test.log"
                type => syslog
                start_position => "beginning"  # 第一次从头开始读取文件
                stat_interval => "3"  # 定时检查文件是否更新,默认1s
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}
  • 执行测试
    /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/file.logstash.conf

  • 返回结果

{
          "host" => "logstash",
       "message" => "Aug  3 09:10:50 logstash systemd: Started Session 86 of user root.",
          "type" => "syslog",
      "@version" => "1",
    "@timestamp" => 2024-08-03T01:19:28.681Z,
          "path" => "/var/log/test.log"
}

logstash filter插件

数据从源传输到存储的过程中,logstashfilter过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值;

  • 利用grok从非结构化数据中派生出结构
  • 利用geoipip地址分析出地理坐标
  • 利用useragent从请求中分析操作系统、设备类型

file

grok插件

  • 我们希望将如下非结构化的数据解析成json结构化数据格式

10.168.99.6 - - [03/Aug/2024:09:51:18 +0800] "POST /wp-admin/admin-ajax.php HTTP/1.1" 200 58 "http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0" "-"

需要使用非常复杂的正则表达式

  • grok其实是带有名字的正则表达式集合。grok内置了很多pattern可以直接使用;

  • grok示例:使用grok patternnginx日志格式转换为json格式

cat /etc/logstash/conf.d/http.logstash.conf

input {
        http {
                port => 5656
        }
}

filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}
  • 使用insomnia post nginx的非结构化日志数据到logstash的5656测试端口

file

  • 返回结果如下:
{
    "httpversion" => "1.1",
        "headers" => {
              "http_host" => "192.168.99.21:5656",
           "request_path" => "/",
           "http_version" => "HTTP/1.1",
         "content_length" => "291",
            "http_accept" => "*/*",
        "http_user_agent" => "insomnia/2021.6.0",
         "request_method" => "POST"
    },
          "ident" => "-",
        "message" => "10.168.99.6 - - [03/Aug/2024:09:51:18 +0800] \"POST /wp-admin/admin-ajax.p                                                                                          hp HTTP/1.1\" 200 58 \"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\" \"Mozi                                                                                          lla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Sa                                                                                          fari/537.36 Edg/126.0.0.0\" \"-\"",
      "timestamp" => "03/Aug/2024:09:51:18 +0800",
        "request" => "/wp-admin/admin-ajax.php",
       "@version" => "1",
           "verb" => "POST",
          "bytes" => "58",
           "auth" => "-",
       "response" => "200",
     "@timestamp" => 2024-08-03T02:22:25.201Z,
       "referrer" => "\"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\"",
           "host" => "10.168.99.6",
       "clientip" => "10.168.99.6",
          "agent" => "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, lik                                                                                          e Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0\""
}

geoip插件

  • geoip插件:根据ip地址提供的对应地域消息,比如经纬度、城市名等,方便进行地理数据分析;

  • geoip示例:通过geoip提取nginx日志中clientip字段,并获取地域消息;

cat /etc/logstash/conf.d/http.logstash.conf

input {
        http {
                port => 5656
        }
}

filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }

        geoip {
                source => "clientip"
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}
  • geoip结果:
 "geoip" => {
                    "ip" => "222.217.160.1",
           "region_name" => "Guangxi",
        "continent_code" => "AS",
              "timezone" => "Asia/Shanghai",
         "country_code2" => "CN",
          "country_name" => "China",
              "latitude" => 22.8137,
             "longitude" => 108.3146,
           "region_code" => "GX",
              "location" => {
            "lon" => 108.3146,
            "lat" => 22.8137
        },
         "country_code3" => "CN"
    },

fields字段

  • 由于输出内容太多,可以通过fileds选项选择自己需要的信息
cat /etc/logstash/conf.d/http.logstash.conf

input {
        http {
                port => 5656
        }
}

filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }

        geoip {
                source => "clientip"
                fields => ["country_name","region_name"]
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}
  • 返回结果:
"geoip" => {
        "country_name" => "China",
         "region_name" => "Guangxi"
    },

date插件

  • date插件:将日期字符串解析为日志类型。然后替换@timestamp字段或指定的其他字段

    • match类型为数组,用于指定日期匹配的格式,可以以此指定多种日期格式
    • target类型为字符串,用于指定赋值的字段名,默认是@timestamp
    • timezone类型为字符串,用于指定时区域
  • date示例,将nginx请求中的timestamp日志进行解析

cat /etc/logstash/conf.d/http.logstash.conf

input {
        http {
                port => 5656
        }
}

filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }

        geoip {
                source => "clientip"
                fields => ["country_name","region_name"]
        }

        date {
                match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
                target => "@timestamp"
                timezone => "Asia/Shanghai"
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}
  • 返回结果
{
           "auth" => "-",
          "ident" => "-",
     "@timestamp" => 2024-08-03T01:51:18.000Z,
      "timestamp" => "03/Aug/2024:09:51:18 +0800",
       "referrer" => "\"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\"",
    "httpversion" => "1.1",
          "agent" => "\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0\"",
        "request" => "/wp-admin/admin-ajax.php",
          "bytes" => "58",
           "verb" => "POST",
       "response" => "200",
        "headers" => {
           "http_version" => "HTTP/1.1",
              "http_host" => "192.168.99.21:5656",
         "request_method" => "POST",
            "http_accept" => "*/*",
         "content_length" => "293",
        "http_user_agent" => "insomnia/2021.6.0",
           "request_path" => "/"
    },

useragent插件

  • useragent插件:根据请求中的user-agent字段,解析出浏览器设备、操作系统等信息

  • useragent示例:

cat /etc/logstash/conf.d/http.logstash.conf

input {
        http {
                port => 5656
        }
}

filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }

        geoip {
                source => "clientip"
                fields => ["country_name","region_name"]
        }

        date {
                match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
                target => "@timestamp"
                timezone => "Asia/Shanghai"
        }

        useragent {
                source => "agent"
                target => "useragnet"
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}
  • 返回结果:
 "useragnet" => {
          "major" => "126",
        "os_name" => "Windows",
          "minor" => "0",
         "device" => "Other",
          "build" => "",
           "name" => "Chrome",
             "os" => "Windows",
          "patch" => "0"
    },

mutate插件

  • mutate主要对字段进行类型转换、删除、替换、更新等操作;
    • remove_field删除字段
    • split字符串切割
    • add_field添加字段
    • convert类型转换
    • gsub字符串替换
    • rename字段重命名
remove_field
  • mutate删除无用字段,比如:headers、message、agent
cat /etc/logstash/conf.d/http.logstash.conf

input {
        http {
                port => 5656
        }
}

filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }

        geoip {
                source => "clientip"
                fields => ["country_name","region_name"]
        }

        date {
                match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
                target => "@timestamp"
                timezone => "Asia/Shanghai"
        }

        useragent {
                source => "agent"
                target => "useragnet"
        }

        mutate {
                remove_field => ["headers","message","agent"]
        }
}

output {
        stdout{
                codec => "rubydebug"
        }
}

结果返回,整个数据返回的结果清爽了很多

{
    "httpversion" => "1.1",
          "ident" => "-",
          "bytes" => "58",
       "referrer" => "\"http://blue.yn.cn/wp-admin/post.php?post=1223&action=edit\"",
       "response" => "200",
      "useragnet" => {
          "build" => "",
         "device" => "Other",
          "patch" => "0",
          "major" => "126",
          "minor" => "0",
           "name" => "Chrome",
             "os" => "Windows",
        "os_name" => "Windows"
    },
       "clientip" => "222.217.160.1",
           "host" => "10.168.99.6",
           "verb" => "POST",
        "request" => "/wp-admin/admin-ajax.php",
          "geoip" => {
        "country_name" => "China",
         "region_name" => "Guangxi"
    },
     "@timestamp" => 2024-08-03T01:51:18.000Z,
       "@version" => "1",
           "auth" => "-",
      "timestamp" => "03/Aug/2024:09:51:18 +0800"
}
split
  • mutate中的split字符切割,指定|为字段分隔符
  • 测试数据:5607|提交订单|2019-12-28 03:18:31
cat mutate.logstash.conf
input {
        http {
                port => 5656
        }
}

filter {
        mutate {
                split => { "message" => "|" }
        }
}

output {
        stdout {
                codec => rubydebug
        }
}

结果返回:

{
    "message" => [
    [0] "5607",
    [1] "提交订单",
    [2] "2019-12-28 03:18:31"
    ],
}
add_field
  • mutateadd_field,可以将分割后的数据创建出新的字段名称。便于以后的统计和分析;
cat mutate.logstash.conf
input {
        http {
                port => 5656
        }
}

filter {
        mutate {
                split => { "message" => "|" }
                add_field => {
                    "UserID" => "%{[message][0]}"
                    "Action" => "%{[message][1]}"
                    "Date" => "%{[message][2]}"
                }
        }

}

output {
        stdout {
                codec => rubydebug
        }
}
convert
  • mutate中的convert类型转换。支持转换integer、float、string等类型
cat mutate.logstash.conf
input {
        http {
                port => 5656
        }
}

filter {
        mutate {
                split => { "message" => "|" }
                add_field => {
                    "UserID" => "%{[message][0]}"
                    "Action" => "%{[message][1]}"
                    "Date" => "%{[message][2]}"
                }
                convert => {
                    "UserID" => "integer"
                    "Action" => "string"
                    "Date" => "string"
                }
                remove_field => ["headers","message"]
        }

}

output {
        stdout {
                codec => rubydebug
        }
}

logstash output插件

  • stdout
  • file
  • elasticsearch

stdout插件

  • stdout插件将数据输出到屏幕终端,便于调试;
output {
    stdout {
        codec => rubydebug
    }
}

file插件

  • 输出至文件,实现将分散在多地的文件统一到一处;
  • 比如将所有web机器的web日志收集到一个文件中,从而方便查阅信息
output {
    file {
        path => "/var/log/web.log"
    }
}

elastic插件

  • 输出到elasticsearch,是最常用的输出插件;
output {
    elasticsearch {
      index => "networks-h3c-%{+YYYY.MM.dd}"
      hosts => ["192.168.99.11:9200","192.168.99.12:9200","192.168.99.13:9200"]
    }
}
最后修改日期: 2024年8月12日

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。