根据包含的数组将事件拆分为两个



我是第一次使用logstash,我正在尝试从amavisd-new中获取JSON报告进行搜索和分析。Amavisd-new 能够将 json 日志记录写入 redis 并且我已经完美地导入了所有内容,并且已经开始学习所有这些。

但是我有一个问题 - 来自 amavis 的 JSON 报告的格式如下所示 - 请注意,"收件人"有一个数组,每个收件人都有一个条目。

我想将整个事件一分为二 - 每个收件人一个,所有其他字段保持不变,但将每个收件人数组成员的"操作"、"ccat_main"、"queued_as"等字段替换为主偶数。

这个想法是,一个具有两个收件人的传入事件将在logstash中产生两个单独的日志事件 - 每个人一个。

我已经查看了事件的拆分,但我看不到如何做到这一点 - 我似乎在任何地方都找不到任何合适的示例。

因此,对于真实单词示例,给定以下内容:

 {
    "@timestamp" => "2014-05-06T09:29:47.048Z",
    "time_unix" => 1399368587.048,
    "time_iso_week_date" => "2014-W19-2",
    "partition" => "19",
    "type" => "amavis",
    "host" => "mailer.example.net",
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
    "recipients" => [
      { "action" => "PASS",
        "ccat_main" => "Clean",
        "queued_as" => "3gNFyR4Mfjzc3",
        "rcpt_is_local" => false,
        "rcpt_to" => "recip2@example.org",
        "smtp_code" => "250",
        "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3",
        "spam_score" => -2.0
      },
      { "action" => "PASS",
        "ccat_main" => "Clean",
        "mail_id_related" => "men7HTERZaOF",
        "penpals_age" => 1114599,
        "queued_as" => "3gNFyR4n6Lzc4",
        "rcpt_is_local" => true,
        "rcpt_to" => "recip1@example.net",
        "smtp_code" => "250",
        "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4",
        "spam_score" => -5.272
      }
    ],
    "smtp_code"  => ["250"],
  }

我想以两个不同的事件结束,如下所示:

  {
    "@timestamp" => "2014-05-06T09:29:47.048Z",
    "time_unix" => 1399368587.048,
    "time_iso_week_date" => "2014-W19-2",
    "partition" => "19",
    "type" => "amavis",
    "host" => "mailer.example.net",
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
    "action" => "PASS",
    "ccat_main" => "Clean",
    "queued_as" => "3gNFyR4Mfjzc3",
    "rcpt_is_local" => false,
    "rcpt_to" => "recip2@example.org",
    "smtp_code" => "250",
    "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3",
    "spam_score" => -2.0
    "smtp_code"  => ["250"],
  }

  {
    "@timestamp" => "2014-05-06T09:29:47.048Z",
    "time_unix" => 1399368587.048,
    "time_iso_week_date" => "2014-W19-2",
    "partition" => "19",
    "type" => "amavis",
    "host" => "mailer.example.net",
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
    "recipients" => [
    "action" => "PASS",
    "ccat_main" => "Clean",
    "mail_id_related" => "men7HTERZaOF",
    "penpals_age" => 1114599,
    "queued_as" => "3gNFyR4n6Lzc4",
    "rcpt_is_local" => true,
    "rcpt_to" => "recip1@example.net",
    "smtp_code" => "250",
    "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4",
    "spam_score" => -5.272
    "smtp_code"  => ["250"],
  }

编辑:

好的,我只是使用了拆分过滤器 - 我应该已经看到了。 但有一件事让我感到困惑。

当只有一个收件人时,它会直接传递块 - kibana 中的结果如下所示:

recipients      {
  "action": "PASS",
  "bypass_banned_checks": true,
  "bypass_spam_checks": true,
  "ccat_main": "Clean",
  "queued_as": "3qv7Km4Ybpz14Kyh",
  "rcpt_is_local": true,
  "rcpt_to": "user@domain.com",
  "rid": "552213780",
  "smtp_code": "250",
  "smtp_response": "250 2.0.0 from MTA(smtp:[127.0.0.1]:10025): 250 2.0.0 Ok: queued as 3qv7Km4Ybpz14Kyh"
}

但是,当有 2 个或更多收件人时,每个新事件都如下所示,并带有相应的信息:

recipients.action       PASS
recipients.ccat_main        CleanTag
recipients.queued_as        3qv7Ly4Pqvz4wyS
recipients.rcpt_is_local        true
recipients.rcpt_to      user@domain.com
recipients.rid      552278239
recipients.smtp_code        250
recipients.smtp_response        250 2.0.0 from MTA(smtp:[127.0.0.1]:10025): 250 2.0.0 Ok: queued as 3qv7Ly4Pqvz4wyS
recipients.whitelisted      true

为什么两者之间有区别? 我想我更愿意将收件人字段保留为值的哈希值,那么使拆分事件与单个事件保持一致的最佳方法是什么?

这就是拆分过滤器。 然后,在每个副本中,您将重命名字段以处于正确的级别,或删除每个副本中不需要的字段。

这是我最终做的事情。 这使得无论有一个或多个要拆分的数组成员,它都是一致的。

可能有一种更简单的方法可以做到这一点,但现在这涵盖了我。 如果我想出别的东西,我会回来修改。

filter {
    split {
        field => "recipients"
        target => "recipcopy"
        remove_field => "recipients"
    }
}
filter {
    if [recipients] {
        ruby {
            code => "event['recipcopy'] = event['recipients'][0]"
            remove_field => "recipients"
        }
    }
}
filter {
    if [recipcopy] {
        mutate {
            rename => { "recipcopy" => "recipients" }
        }
    }
}

最新更新