在我之前的文章 “Logstash:Data 转化,剖析,提取,丰厚及中心操作” 有涉及到这个论题。今日我想运用一个详细的例子来更深入地展示。
准备数据
咱们先来把如下的数据复制下来,并保存到一个叫做 sample.json 的文件中。咱们能够把这个文件置于 Logstash 的装置根目录下。
sample.json
1. {"id":1,"timestamp":"2019-09-12T13:43:42Z","paymentType":"Amex","name":"Merrill Duffield","gender":"Female","ip_address":"132.150.218.21","purpose":"Toys","country":"United Arab Emirates","age":33}
2. {"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}
3. {"id":3,"timestamp":"2019-07-14T04:48:25Z","paymentType":"Visa","name":"Harri Cayette","gender":"Female","ip_address":"227.6.210.146","purpose":"Sports","country":"Canada","age":27}
4. {"id":4,"timestamp":"2020-02-29T12:41:59Z","paymentType":"Mastercard","name":"Regan Stockman","gender":"Male","ip_address":"139.224.15.154","purpose":"Home","country":"Indonesia","age":34}
5. {"id":5,"timestamp":"2019-08-03T19:37:51Z","paymentType":"Mastercard","name":"Wilhelmina Polle","gender":"Female","ip_address":"252.254.68.68","purpose":"Health","country":"Ukraine","age":51}
1. $ pwd
2. /Users/liuxg/elastic/logstash-8.6.1
3. $ ls sample.json
4. sample.json
解析及过滤 JSON 文件
咱们有如下的几种方法:
运用 Logstash 的 Input JSON codec
咱们创立如下的 Logstash 配置文件:
logstash_input.conf
`
1. input {
2. file {
3. path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4. type => "applog"
5. codec => "json"
6. start_position => "beginning"
7. sincedb_path => "/dev/null"
8. }
9. }
11. output {
12. stdout {
13. codec => rubydebug
14. }
15. }
`![]()
咱们运转 Logstash:
1. $ pwd
2. /Users/liuxg/elastic/logstash-8.6.1
3. $ ./bin/logstash -f logstash_input.conf
在运转的terminal 中,咱们能够看到如下的成果:
从上面,咱们能够看出来咱们的数据现已变为结构化的数据了。
运用 JSON filter
咱们创立如下的一个 Logstash 配置文件:
logstash_filter.conf
`
1. input {
2. file {
3. path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4. type => "applog"
5. start_position => "beginning"
6. sincedb_path => "/dev/null"
7. }
8. }
10. filter {
11. json {
12. source => "message"
13. }
14. }
16. output {
17. stdout {
18. codec => rubydebug
19. }
20. }
`![]()
咱们运转 Logstash:
1. $ pwd
2. /Users/liuxg/elastic/logstash-8.6.1
3. $ ./bin/logstash -f logstash_filter.conf
在运转的 terminal 中,咱们能够看到如下的成果:
从上面咱们能够看出来 JSON 文件也被正确地结构化了。
接下来,咱们来清理一下咱们的数据,并过滤掉那些 paymentType 为 Mastercard 的文档。咱们进一步修正配置文件:
logstash_filter.conf
`
1. input {
2. file {
3. path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4. type => "applog"
5. start_position => "beginning"
6. sincedb_path => "/dev/null"
7. }
8. }
10. filter {
11. json {
12. source => "message"
13. }
15. if [paymentType] == "Mastercard" {
16. drop {}
17. }
19. mutate {
20. remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
21. }
23. }
25. output {
26. stdout {
27. codec => rubydebug
28. }
29. }
`![]()
在上面,咱们过滤掉 paymentType 为 Mastercard 的文档,一起,咱们也去除一些不需要的字段,比方 message。咱们再次运转:
很显然,咱们这次没有看到 message 字段,一起 paymentType 为 Mastercard 的文档都被过滤掉了。
处理含有数值的 JSON 数据
接下来咱们运用另外一组数据。在这个数据里,它的 JSON 文件里含有一个数组:
sample-split.json
1. {"id":1,"timestamp":"2019-06-19T23:04:47Z","paymentType":"Mastercard","name":"Ardis Shimuk","gender":"Female","ip_address":"91.33.132.38","purpose":"Home","country":"France","pastEvents":[{"eventId":1,"transactionId":"trx14224"},{"eventId":2,"transactionId":"trx23424"}],"age":34}
2. {"id":2,"timestamp":"2019-11-26T15:40:56Z","paymentType":"Amex","name":"Benoit Urridge","gender":"Male","ip_address":"26.71.230.228","purpose":"Shoes","country":"Brazil","pastEvents":[{"eventId":3,"transactionId":"63323-064"},{"eventId":4,"transactionId":"0378-3120"}],"age":51}
3. {"id":3,"timestamp":"2019-05-08T16:24:25Z","paymentType":"Visa","name":"Lindsy Ketchell","gender":"Female","ip_address":"189.216.71.184","purpose":"Home","country":"Brazil","pastEvents":[{"eventId":5,"transactionId":"68151-3826"},{"eventId":6,"transactionId":"52125-611"}],"age":26}
4. {"id":4,"timestamp":"2019-06-10T18:01:32Z","paymentType":"Visa","name":"Cary Boyes","gender":"Male","ip_address":"223.113.73.232","purpose":"Grocery","country":"Pakistan","pastEvents":[{"eventId":7,"transactionId":"63941-950"},{"eventId":8,"transactionId":"55926-0011"}],"age":46}
5. {"id":5,"timestamp":"2020-02-18T12:27:35Z","paymentType":"Visa","name":"Betteanne Diament","gender":"Female","ip_address":"159.148.102.98","purpose":"Computers","country":"Brazil","pastEvents":[{"eventId":9,"transactionId":"76436-101"},{"eventId":10,"transactionId":"55154-3330"}],"age":41}
请注意上面最终面的一个空行。这样能够保证所有的文档被摄入。如上所示,它含有一个叫做 pastEvents 的字段。它是一个数组。在每个文档中,它含有 1 个或多个 eventId。咱们能够经过 split 过滤器来把这些 eventId 变成单个的事件。
咱们创立如下的一个 Logstash 配置文件:
logstash_split.conf
`
1. input {
2. file {
3. path => "/Users/liuxg/elastic/logstash-8.6.1/sample-split.json"
4. type => "applog"
5. start_position => "beginning"
6. sincedb_path => "/dev/null"
7. }
8. }
10. filter {
11. json {
12. source => "message"
13. }
15. split {
16. field => "[pastEvents]"
17. }
19. mutate {
20. remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
21. }
23. }
25. output {
26. stdout {
27. codec => rubydebug
28. }
29. }
`![]()
咱们能够参阅官方文档Split filter plugin | Logstash Reference [8.6] | Elastic来了解更多关于 split 过滤的功用。
咱们运转上面的 pipeline:
./bin/logstash -f logstash_split.conf
咱们能够看到在之前的源 JSON 文档中,它共有5个文档,但是经过 split 过滤器后,它现在变为 10 个文档了。
输出到 Elasticsearch
经过上面的 input 及 filter,咱们得到了咱们想要的结构化的数据。咱们能够参阅文章 “Logstash:怎么连接到带有 HTTPS 访问的集群” 把输出成果输出到 Elasticsearch 集群中。这里就不再累述了。