一、概述

Logstash 是一个开源的数据搜集和日志处理东西,它是 Elastic Stack(ELK Stack)的一部分,用于从各种数据源中搜集、转化和传输数据,以协助剖析和可视化大规模数据。Logstash 通常与 ElasticsearchKibana 一同运用,以完结实时日志剖析和监控。

以下是 Logstash 的首要功用和特色:

  • 数据搜集Logstash 能够从多种数据源中搜集数据,包括日志文件、数据文件、消息行列、数据库、网络流量等。它支撑多种输入插件,以适应不同数据源的需求。

  • 数据转化Logstash 具有强壮的数据转化功用,能够对搜集的数据进行过滤、解析、转化和丰厚操作。它运用过滤插件来对数据履行各种操作,包括正则表达式解析、字段拆分、数据脱敏、时刻戳生成等。

  • 多通道数据处理Logstash 允许将数据流式传输到不同的通道,以满意不同的需求。通道能够是 Elasticsearch、Kafka、RabbitMQ 等,或许您能够界说自界说输出插件。

  • 数据过滤和插件Logstash 有丰厚的插件生态系统,包括输入插件、过滤插件和输出插件。这些插件能够依据特定需求来装备和扩展,以适应各种数据处理使命。

  • 实时数据处理Logstash 具有实时数据处理能力,能够将数据从源头到目的地以实时或近实时的方式传递。这使得它适用于日志监控、安全剖析、功能监控等实时运用。

  • 可伸缩性Logstash 能够与多个 Logstash 实例一同布置,以完结数据搜集和处理的横向扩展。这有助于应对大规模数据需求。

  • 易于装备Logstash 运用简略的装备文件(通常是YAML格局)来界说数据流的处理进程。装备文件十分直观,易于理解和保护。

  • 社区和支撑Logstash 是一个广泛选用的开源项目,具有活泼的社区支撑和很多的文档资源。

LogstashElastic Stack 中的一个重要组件,与 ElasticsearchKibana 合作运用,能够构建强壮的实时日志和数据剖析解决方案。它为安排供给了强壮的数据搜集和处理东西,用于监控、剖析和可视化大规模数据。

官方文档:

二、Logstash 架构

开源数据搜集引擎 Logstash 解说和示例解说
Logstash 包含3个首要部分: 输入(inputs)过滤器(filters)输出(outputs)

Logstash的事情(logstash将数据流中等每一条数据称之为一个event)处理流水线有三个首要角色完结:inputs –> filters –> outputs

  • inpust:有必要,担任发生事情(Inputs generate events),常用:File、syslog、redis、kakfa、beats(如:Filebeats);官方文档:www.elastic.co/guide/en/lo…
  • filters:可选,担任数据处理与转化(filters modify them),常用:grok、mutate、drop、clone、geoip;官网文档:www.elastic.co/guide/en/lo…
  • outpus:有必要,担任数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、kakfa、statsd;官方文档:www.elastic.co/guide/en/lo…

二、ElasticSearch 布置

这儿能够挑选以下布置方式:

这儿我挑选 docker-compose 布置方式。

1)布置 docker

# 装置yum-config-manager装备东西
yum -y install yum-utils
# 建议运用阿里云yum源:(引荐)
#yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 装置docker-ce版别
yum install -y docker-ce
# 发动并开机发动
systemctl enable --now docker
docker --version

2)布置 docker-compose

curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
chmod  x /usr/local/bin/docker-compose
docker-compose --version

3)创立网络

# 创立
docker network create bigdata
# 检查
docker network ls

4)修正 Linux 句柄数和最大线程数

#检查当前最大句柄数
sysctl -a | grep vm.max_map_count
#修正句柄数
vi /etc/sysctl.conf
vm.max_map_count=262144
#暂时收效,修正后需求重启才能收效,不想重启能够设置暂时收效
sysctl -w vm.max_map_count=262144
#修正后需求重新登录收效
vi /etc/security/limits.conf
# 增加以下内容
* soft nofile 65535
* hard nofile 65535
* soft nproc 4096
* hard nproc 4096
# 重启服务,-h 立刻重启,默许距离一段时刻才会开端重启
reboot -h now

5)下载布置包开端布置

# 这儿挑选 docker-compose 布置方式
git clone https://gitee.com/hadoop-bigdata/docker-compose-es-kibana.git
cd docker-compose-es-kibana
chmod -R 777 es kibana
docker-compose -f docker-compose.yaml up -d
docker-compose ps

三、Logstash 布置与装备解说

1)下载Logstash装置包

访问官方网站 www.elastic.co/downloads/l… ,下载相应版别的zip文件。

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.11.1-linux-x86_64.tar.gz

2)解压装置包文件

tar -xf logstash-8.11.1-linux-x86_64.tar.gz

3)不同场景测验

1)测验1:选用规范的输入和输出

cd logstash-8.11.1
# 测验,选用规范的输入和输出,#codec=>rubydebug,解析转化类型:ruby
# codec各类型解说:https://www.elastic.co/guide/en/logstash/7.9/codec-plugins.html
./bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
# 输入:
hello
# 输出:
{
         "event" => {
        "original" => "hello"
    },
          "host" => {
        "hostname" => "local-168-182-110"
    },
      "@version" => "1",
    "@timestamp" => 2023-11-19T02:31:02.485073839Z,
       "message" => "hello"
}

开源数据搜集引擎 Logstash 解说和示例解说

2)测验2:运用装备文件 规范输入输出

装备文件:config/logstash-1.conf

input {
	stdin { }
}
output {
   stdout { codec => rubydebug }
}

发动服务

./bin/logstash -f ./config/logstash-1.conf

3)测验3:装备文件 file输入 规范的屏幕输出

装备文件:./config/logstash-2.conf

input {
  file {
    path => "/var/log/messages"
  }
}
output {
  stdout {
    codec=>rubydebug
  }
}

发动服务

./bin/logstash -f ./config/logstash-2.conf

开源数据搜集引擎 Logstash 解说和示例解说

4)测验4:装备文件 文件输入 kafka输出

kafka 布置,能够参考我以下几篇文章:

装备文件:./config/logstash-3.conf

input {
  file {
    path => "/var/log/messages"
  }
}
output {
  kafka {
    bootstrap_servers => "192.168.182.110:9092"
    topic_id => "messages"
  }
}

发动服务

./bin/logstash -f ./config/logstash-3.conf

消费 kafka 数据


docker exec -it kafka-node1 bash
./bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic messages  --from-beginning

开源数据搜集引擎 Logstash 解说和示例解说

5)测验5:装备文件 filebeat端口输入 规范输出

filebeat 布置,能够参考我以下几篇文章:

服务器发生日志(filebeat)—》logstash服务器

装备文件:./config/logstash-4.conf

input {
  beats {
    port => 5044
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

发动服务

./bin/logstash -f ./config/logstash-4.conf

发动后会在本机发动一个5044端口,不要和系统已发动的端口抵触即可,合作测验咱们在 filebeat 服务器上修正装备文件。

filebeat 装备文件内容:filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
# ------------------------------ Logstash Output -------------------------------
output.logstash:
  hosts: ["192.168.182.110:5044"]      

发动 filebeat

./filebeat -e -c filebeat.yml

6)测验6:装备文件 filebeat端口输入 输出到kafka

服务器发生日志(filebeat)—> logstash服务器—->kafka服务器

装备文件:./config/logstash-5.conf

input {
  beats {
    port => 5044
  }
}
output {
  kafka {
  bootstrap_servers => "192.168.182.110:9092"
  topic_id => "messages"
  }
}

发动服务

./bin/logstash -f ./config/logstash-5.conf

7)测验7:filebeat数据搜集 kafka读取当输入 logstash处理 输出到 ES

服务器发生日志(filebeat)—> kafka服务器__抽取数据___> logstash服务器—->ES

开源数据搜集引擎 Logstash 解说和示例解说
logstash的装备:./config/logstash-6.conf

input {
    kafka {
    bootstrap_servers => "10.82.192.110:9092"
    topics => ["messages"]
    }
}
output {
    elasticsearch {
    hosts => ["10.82.192.110:9200"]
    index => "messageslog-%{ YYYY-MM-dd}"
  }
}

filebeat.yml output.kafka 装备:

# ------------------------------ KAFKA Output -------------------------------
output.kafka:
  eanbled: true
  hosts: ["10.82.192.110:9092"]
  version: "2.0.1"
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000

运用 systemctl 发动 filebeat

# vi /usr/lib/systemd/system/filebeat.service
[Unit]
Description=filebeat server daemon
Documentation=/opt/filebeat-7.6.2-linux-x86_64/filebeat -help
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Environment="BEAT_CONFIG_OPTS=-c /opt/filebeat-7.6.2-linux-x86_64/filebeat.yml"
ExecStart=/opt/filebeat-7.6.2-linux-x86_64/filebeat $BEAT_CONFIG_OPTS
Restart=always
[Install]
WantedBy=multi-user.target

运用 systemctl 发动 logstash

# vi /usr/lib/systemd/system/logstash.service
[Unit]
Description=logstash
[Service]
User=root
ExecStart=/opt/logstash-8.11.1/bin/logstash -f /opt/logstash-8.11.1/config/logstash-6.conf
Restart=always
[Install]
WantedBy=multi-user.target

发动服务

systemctl start logstash
systemctl status logstash

四、Logstash filter常用插件

担任数据处理与转化(filters modify them),常用:grok、mutate、drop、clone、geoip;官网文档:www.elastic.co/guide/en/lo…

1)运用grok内置的正则事例

grok 插件:Grok是将非结构化日志数据解析为结构化和可查询内容的好方法,底层原理是根据正则匹配恣意文本格局

此东西十分合适syslog日志、apache和其他Web服务器日志、mysql日志,以及一般来说,任何通常为人类而不是计算机消费编写的日志格局。

grok内置了120种匹配方式,也能够自界说匹配方式:github.com/logstash-pl…

filebeat装备:filebeat.yml

##
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
output.logstash:
  #指定logstash监听的IP和端口
  hosts: ["192.168.182.110:5044"]

logstash 装备:stdin-grok-stout.conf

cat >> stdin-grok-stout.conf << EOF
input {
  #监听的类型
  beats {
  #监听的本地端口
    port => 5044
  }
}
filter{
  grok{ 
   #match => { "message" => "%{COMBINEDAPACHELOG}" } 
   #上面的"COMBINEDAPACHELOG"变量官方github上现已抛弃,建议运用下面的匹配方式 
   #参考地址:https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/httpd
   match => { "message" => "%{HTTPD_COMBINEDLOG}" }
  }
}
output {
  stdout {}
  elasticsearch {
    #界说es集群的主机地址
    hosts => ["192.168.182.110:9200"]
    #界说索引称号
    index => "hqt-application-pro-%{ YYYY.MM.dd}"
  }
}
EOF

2)运用grok自界说的正则事例

参考官网地址:www.elastic.co/guide/en/lo…

装备如下:

cat >> stdin-grok_custom_patterns-stdout.conf << EOF
input {
 stdin {}
}
filter {
  grok {
    #指定方式匹配的目录,能够运用绝对路径
    #在./patterns目录下随便创立一文件,并写入以下匹配方式
    # ORDER_ID [u4e00-u9fa5]{10,11}:[0-9A-F]{10,11}
    patterns_dir => ["./patterns"]
    #匹配方式
    #测验数据为:app_name:gotone-payment-api,client_ip:,context:,docker_name:,env:dev,exception:,extend1:,level:INFO,line:-1,log_message:com.gotone.paycenter.controller.task.PayCenterJobHandler.queryPayOrderTask-request:[\],log_time:2022-11-23 00:00:00.045,log_type:applicationlog,log_version:1.0.0,本次成交的订单编号为:BEF25A72965,parent_span_id:,product_line:,server_ip:,server_name:gotone-payment-api-c86658cb7-tc8k5,snooper:,span:0,span_id:,stack_message:,threadId:104,trace_id:,user_log_type:
    match => { "message" => "%{ORDER_ID:test_order_id}" }
  }
}
output {
  stdout {}
}
EOF

3)filter插件通用字段事例(增加/删去字段、tag)

原有字段(nginx的json解析日志)

装备如下:

cat >> stdin-remove_add_field-stout.conf << EOF
input {
  beats {
    port => 5044
  }
}
filter {
  mutate {
    #移除指定的字段,运用逗号分隔
    remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth" ]
    #增加指定的字段,运用逗号分隔
    #"%{clientip}"运用%能够将已有字段的值当作变量运用
    add_field => {
     "app_name" => "nginx"
     "test_clientip" => "clientip---->%{clientip}"
    }
    #增加tag
    add_tag => [ "linux","web","nginx","test" ]
    #移除tag
    remove_tag => [ "linux","test" ]
  }
}
output {
  stdout {}
}
EOF

4)date 插件修正写入ES的时刻事例

测验日志:如下是咱们要搜集的一条json格局的日志

{"app_name":"gotone-payment-api","client_ip":"","context":"","docker_name":"","env":"dev","exception":"","extend1":"","level":"INFO","line":68,"log_message":"现代金控付出查询->调用入参[{}]","log_time":"2022-11-23 00:00:00.051","log_type":"applicationlog","log_version":"1.0.0","method_name":"com.gotone.paycenter.dao.third.impl.modernpay.ModernPayApiAbstract.getModernPayOrderInfo","parent_span_id":"","product_line":"","server_ip":"","server_name":"gotone-payment-api-c86658cb7-tc8k5","snooper":"","span":0,"span_id":"","stack_message":"","threadId":104,"trace_id":"gotone-payment-apib4a65777-ce6b-4bcc-8aef-71a7cfffaf2c","user_log_type":""}

装备如下:

cat >> stdin-date-es.conf << EOF
input {
  file {
    #指定搜集的路径
    path => "/var/log/messages"
  }
}
filter {
  json {
  #JSON解析器 能够将json方式的数据转化为logstash实践的数据结构(依据key:value拆分红字段方式)
    source => "message"
  }
  date {
    #匹配时刻字段并解析
    match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
    #将匹配到的时刻字段解析后存储到方针字段,默许字段为"@timestamp"
    target => "@timestamp"
    timezone => "Asia/Shanghai"
  }
}
output {
  stdout {}
  elasticsearch {
    #界说es集群的主机地址
    hosts => ["192.168.182.110:9200"]
    #界说索引称号
    index => "hqt-application-pro-%{ YYYY.MM.dd}"
  }
}
EOF

5)geoip剖析原IP地址位置事例

测验数据为:nginx的json格局日志

{"@timestamp":"2022-12-18T03:27:10 08:00","host":"10.0.24.2","clientip":"114.251.122.178","SendBytes":4833,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"43.143.242.47","uri":"/index.html","domain":"43.143.242.47","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","status":"200"}

装备如下:

cat >> beats-geoip-stdout.conf << EOF
input {
  file {
    #指定搜集的路径
    path => "/var/log/test.log"
  }
}
filter {
  json {
  #JSON解析器 能够将json方式的数据转化为logstash实践的数据结构(依据key:value拆分红字段方式)
    source => "message"
  }
  geoip {
    #指定根据哪个字段剖析IP地址
    source => "client_ip"
    #指定IP地址剖析模块所运用的数据库,默许为GeoLite2-City.mmdb(这儿有必要再次指定以下,不然不会显现城市)
    database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
    #假如期望检查指定的字段,则能够在这儿装备,若不装备,表明显现所有的查询字段
    #fields => ["city_name","country_name","ip"]
    #指定geoip的输出字段,当有多个IP地址需求剖析时(例如源IP和目的IP),则该字段十分有用
    #target => "test-geoip-nginx"
  }
}
output {
  stdout {}
}
EOF

GeoLite2-City.mmdb 下载:dev.maxmind.com/geoip/geoli…

开源数据搜集引擎 Logstash 解说和示例解说

7)mutate组件常用事例

mutate 测验数据 python 脚本:

cat >> generate_log.py << EOF
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : oldboyedu-linux80
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 装备root的logging.Logger实例的根本装备
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1], filemode='a',)
actions = ["阅读⻚⾯", "评论商品", "加⼊收藏", "加⼊购物⻋", "提交订单", "使⽤优惠券", "领取优惠券", "搜索", "检查订单", "付款", "清空购物⻋"]
while True: 
    time.sleep(random.randint(1, 5))
    user_id = random.randint(1, 10000)
# 对⽣成的浮点数保存2位有用数字.
    price = round(random.uniform(15000, 30000),2)
    action = random.choice(actions)
    svip = random.choice([0,1])
    logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
EOF
# python generate_log.py  /tmp/app.log

8)logstash的多if分支事例

装备如下:

cat >> homework-to-es.conf << EOF
input {
  beats {
    type => "test-nginx-applogs"
    port => 5044
  } 
  file {
    type => "test-product-applogs"
    path => "/tmp/app.logs"
  }
  beats {
    type => "test-dw-applogs"
    port => 8888
  }
  file { 
    type => "test-payment-applogs"
    path => "/tmp/payment.log"
  } 
}
filter {
  if [type] == "test-nginx-applogs"{
    mutate {
      remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth","xff","referer","upstreamtime","upstreamhost","tcp_xff"]
    }
    geoip {
     source => "clientip"
     database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
    }
    useragent {
     source => "http_user_agent"
    }
  } 
  if [type] == "test-product-applogs" {
    mutate {
     split => { "message" => "|" }
    }
    mutate {
      add_field => {
        "user_id" => "%{[message][1]}"
        "action" => "%{[message][2]}"
        "svip" => "%{[message][3]}"
        "price" => "%{[message][4]}"
      }
    }
    mutate {
      convert => {
      "user_id" => "integer"
      "svip" => "boolean"
      "price" => "float"
      }
    }
  } 
  if [type] in [ "test-dw-applogs","test-payment-applogs" ] {
    json {
      source => "message"
    }
    date {
      match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
      target => "@timestamp"
    }
  }
}
output {
  stdout {}
  if [type] == "test-nginx-applogs" { 
    elasticsearch {
      hosts => ["192.168.182.110:9200"]
      index => "test-nginx-logs-%{ YYYY.MM.dd}" 
    }
  }
  if [type] == "test-product-applogs" {
    elasticsearch {
      hosts => ["192.168.182.110:9200"]
      index => "test-product-applogs-%{ YYYY.MM.dd}"    
    }
  }
  if [type] in [ "test-dw-applogs","test-payment-applogs" ] {
    elasticsearch {
      hosts => ["192.168.182.110:9200"]
      index => "test-center-applogs-%{ YYYY.MM.dd}"
    }
  }
}
EOF

开源数据搜集引擎 Logstash 解说和示例解说就先到这儿了,有任何疑问也可重视我大众号:大数据与云原生技能共享,进行技能交流,如本篇文章对您有所协助,麻烦帮助一键三连(点赞、转发、收藏)~

开源数据搜集引擎 Logstash 解说和示例解说