草庐IT

Logstash:在实施之前测试 Logstash 管道/过滤器

Elastic 中国社区官方博客 2023-04-20 原文

检测解析的日志是否包含单个或多个警告消息,然后添加一个字段来说明这两种情况。在很多的情形下,我们在测试 Logstash 的过滤器时,并不急于把实际的 input 的数据接入到过滤器中来进行测试。我们首先来选择一个比较容易理解的 input 方式,使用一个文档来进行解析,并测试管道。在今天的文章中,我来详细介绍两种常用的方法来如何测试 Logstash 的管道/过滤器。

方法一:使用 generator

方法如下:

logstash.conf

input {
  generator {
    message => '{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'
    count => 1
  }
}
 
filter {
    json {
        source => "message"
    }

    if [paymentType] == "Mastercard" {
        drop {}
    }

    mutate {
        remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
    }

}
 
output {
  stdout {
    codec => rubydebug
  }
}

在上面,我们使用 generator 的方法来生成一个文档,并让这个文档经过 filter 部分,并最终在 console 中进行展示。我们可以通过如下的命令来运行上面的 Logstash 管道:

$ pwd
/Users/liuxg/elastic/logstash-8.6.1
$ ls logstash.conf
logstash.conf
$ ./bin/logstash -f logstash.conf

   

从上面,我们可以看出来 json filter 工作正常。在本示例中,为了说明问题的方便,我仅使用了几个过滤器。在时间的使用中,我们可以有很多的过滤器来组成这个 pipeline。一旦我们确定了这些过滤器能完成我们所需要的功能,我们可以把所需要的 input 换进来即可,比如:

logstash_filter.conf

input {
  file {
    path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
    type    => "applog"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
    json {
        source => "message"
    }

    if [paymentType] == "Mastercard" {
        drop {}
    }

    mutate {
        remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
    }

}

output {
	stdout { 
	  codec => rubydebug 
	}
}

我们可以使用诸如如下格式的测试文件来进行测试:

sample.json

{"id":1,"timestamp":"2019-09-12T13:43:42Z","paymentType":"Amex","name":"Merrill Duffield","gender":"Female","ip_address":"132.150.218.21","purpose":"Toys","country":"United Arab Emirates","age":33}
{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}
{"id":3,"timestamp":"2019-07-14T04:48:25Z","paymentType":"Visa","name":"Harri Cayette","gender":"Female","ip_address":"227.6.210.146","purpose":"Sports","country":"Canada","age":27}
{"id":4,"timestamp":"2020-02-29T12:41:59Z","paymentType":"Mastercard","name":"Regan Stockman","gender":"Male","ip_address":"139.224.15.154","purpose":"Home","country":"Indonesia","age":34}
{"id":5,"timestamp":"2019-08-03T19:37:51Z","paymentType":"Mastercard","name":"Wilhelmina Polle","gender":"Female","ip_address":"252.254.68.68","purpose":"Health","country":"Ukraine","age":51}

当然实际的文档可能比这个要长很多。

更多关于 generator 方面的示例,请阅读我之前的文章 “Logstash:Data 转换,分析,提取,丰富及核心操作”。

方法二:使用 stdin input

假设我们有以下代表上述两种情况的日志文件:

$ pwd
/Users/liuxg/elastic/logstash-8.6.1
$ cat multivaluewarn.json 
{"waf": {"ver": "2.0","warnRules": "3000030;3000057;950001;950109;959073;973335;981173;981244;981318","denyMsg": "Anomaly Score Exceeded for SQL Injection","denyActions": "3","warnMsg": "Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"}} 
$ 
$ cat singlevaluewarn.json 
{"waf": {"ver": "2.0","warnRules": "681984","policy": "api_89894","warnMsg": "Alert rq without DEVICEID header","warnTags": "DEVICEID_Detection","warnActions": "2"}}

multivaluewarn.json

{"waf": {"ver": "2.0","warnRules": "3000030;3000057;950001;950109;959073;973335;981173;981244;981318","denyMsg": "Anomaly Score Exceeded for SQL Injection","denyActions": "3","warnMsg": "Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"}} 
{
   "waf":{
      "ver":"2.0",
      "warnRules":"3000030;3000057;950001;950109;959073;973335;981173;981244;981318",
      "denyMsg":"Anomaly Score Exceeded for SQL Injection",
      "denyActions":"3",
      "warnMsg":"Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"
   }
}

singlevaluewarn.json

{"waf": {"ver": "2.0","warnRules": "681984","policy": "api_89894","warnMsg": "Alert rq without DEVICEID header","warnTags": "DEVICEID_Detection","warnActions": "2"}}
{
   "waf":{
      "ver":"2.0",
      "warnRules":"681984",
      "policy":"api_89894",
      "warnMsg":"Alert rq without DEVICEID header",
      "warnTags":"DEVICEID_Detection",
      "warnActions":"2"
   }
}

阅读日志我们可以看到字段 [waf][warnMsg] 使用分号分隔警告消息; 在多次警告的情况下。
将收集到的信息转换为 Logstash 管道将导致:

logstash_warning.conf

input {
  stdin { codec => json }
}

filter {
  if ";" in [waf][warnMsg]{
    mutate {
      add_field =>  [ "wafWarningMSG", "multi warnings" ]
    }
  }
  else {
    mutate {
      add_field =>  [ "wafWarningMSG", "single" ]
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

将管道添加到 conf 文件(称为 logstash_warning.conf ),然后使用如下的命令来测试 pipeline:

$ pwd
/Users/liuxg/elastic/logstash-8.6.1
$ ls logstash_warning.conf 
logstash_warning.conf
$ ./bin/logstash -f logstash_warning.conf < multivaluewarn.json

 输出显示一个名为 wafWarningMSG 的新字段,其中包含 "multi warnings":

当然,我们也可以使用如下的命令来进行测试:

./bin/logstash -f logstash_warning.conf < singlevaluewarn.json 

 

从上面的输出中,我们可以看到 wafWarningMSG 字段的值为 single。

一旦我们测试好 pipeline 中的过滤器,我们就可以直接把 input 部分换成我们想要的格式即可。

希望你觉得它有用,如有任何问题,请随时联系我!

有关Logstash:在实施之前测试 Logstash 管道/过滤器的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  3. ruby - Ruby 的 Hash 在比较键时使用哪种相等性测试? - 2

    我有一个围绕一些对象的包装类,我想将这些对象用作散列中的键。包装对象和解包装对象应映射到相同的键。一个简单的例子是这样的:classAattr_reader:xdefinitialize(inner)@inner=innerenddefx;@inner.x;enddef==(other)@inner.x==other.xendenda=A.new(o)#oisjustanyobjectthatallowso.xb=A.new(o)h={a=>5}ph[a]#5ph[b]#nil,shouldbe5ph[o]#nil,shouldbe5我试过==、===、eq?并散列所有无济于事。

  4. ruby - RSpec - 使用测试替身作为 block 参数 - 2

    我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere

  5. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  6. ruby-on-rails - 迷你测试错误 : "NameError: uninitialized constant" - 2

    我遵循MichaelHartl的“RubyonRails教程:学习Web开发”,并创建了检查用户名和电子邮件长度有效性的测试(名称最多50个字符,电子邮件最多255个字符)。test/helpers/application_helper_test.rb的内容是:require'test_helper'classApplicationHelperTest在运行bundleexecraketest时,所有测试都通过了,但我看到以下消息在最后被标记为错误:ERROR["test_full_title_helper",ApplicationHelperTest,1.820016791]test

  7. ruby - 如何在 Rails 4 中使用表单对象之前的验证回调? - 2

    我有一个服务模型/表及其注册表。在表单中,我几乎拥有服务的所有字段,但我想在验证服务对象之前自动设置其中一些值。示例:--服务Controller#创建Action:defcreate@service=Service.new@service_form=ServiceFormObject.new(@service)@service_form.validate(params[:service_form_object])and@service_form.saverespond_with(@service_form,location:admin_services_path)end在验证@ser

  8. ruby - 即使失败也继续进行多主机测试 - 2

    我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r

  9. ruby-on-rails - 如何使辅助方法在 Rails 集成测试中可用? - 2

    我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel

  10. ruby-on-rails - Cucumber 是否只是 rspec 的包装器以帮助将测试组织成功能? - 2

    只是想确保我理解了事情。据我目前收集到的信息,Cucumber只是一个“包装器”,或者是一种通过将事物分类为功能和步骤来组织测试的好方法,其中实际的单元测试处于步骤阶段。它允许您根据事物的工作方式组织您的测试。对吗? 最佳答案 有点。它是一种组织测试的方式,但不仅如此。它的行为就像最初的Rails集成测试一样,但更易于使用。这里最大的好处是您的session在整个Scenario中保持透明。关于Cucumber的另一件事是您(应该)从使用您的代码的浏览器或客户端的角度进行测试。如果您愿意,您可以使用步骤来构建对象和设置状态,但通常您

随机推荐