Amazon s3 - 此 Logstash 配置有什么问题?



我们使用 Logstash (2.3.3) 来收听 Kafka 中的多个主题,使用 Kafka 的新插件 (3.0.2)。然后,每个主题的数据将根据主题名称(作为元数据添加)重定向到 S3 存储桶中的特定文件夹。但是,在当前配置下,只有第一个 S3 输出的数据似乎登陆其 S3 存储桶/文件夹中。

有人可以让我知道这里出了什么问题吗?我很确定有一种更好的方法来编写这个配置,可以满足我们的要求!

input
{
 kafka
 {
  bootstrap_servers => "10.0.0.5:9093,10.0.1.5:9093"
  topics => "topic"
  codec => "json"
  ssl => true
  ssl_keystore_location => "/opt/logstash/ssl/server.keystore.jks"
  ssl_keystore_password => "<snipped>"
  ssl_truststore_location => "/opt/logstash/ssl/server.truststore.jks"
  ssl_truststore_password => "<snipped>"
  add_field => { "[@metadata][topic]" => "topic" }
 }
 kafka
 {
  bootstrap_servers => "10.0.0.5:9093,10.0.1.5:9093"
  topics => "topic-test"
  codec => "json"
  ssl => true
  ssl_keystore_location => "/opt/logstash/ssl/server.keystore.jks"
  ssl_keystore_password => "<snipped>"
  ssl_truststore_location => "/opt/logstash/ssl/server.truststore.jks"
  ssl_truststore_password => "<snipped>"
  add_field => { "[@metadata][topic]" => "topic-test" }
 }
 kafka
 {
  bootstrap_servers => "10.0.0.5:9093,10.0.1.5:9093"
  topics => "daily_batch"  
  ssl => true
  ssl_keystore_location => "/opt/logstash/ssl/server.keystore.jks"
  ssl_keystore_password => "<snipped>"
  ssl_truststore_location => "/opt/logstash/ssl/server.truststore.jks"
  ssl_truststore_password => "<snipped>"
  add_field => { "[@metadata][topic]" => "daily_batch" }
 }
}
output
{
 if [@metadata][topic] == "topic"
 {
  s3
    {
     region => "us-east-1"
     bucket => "our-s3-storage/topic"
     size_file => 20971520
     temporary_directory => "/logstash"
     use_ssl => "true"
     codec => json_lines     
    }
 }
 if [@metadata][topic] == "topic-test"
 {
  s3
    {
     region => "us-east-1"
     bucket => "our-s3-storage/topic-test"
     size_file => 2097152
     temporary_directory => "/logstash"
     use_ssl => "true"
     codec => json_lines     
    }
 }
 if [@metadata][topic] == "daily_batch"
 {
  s3
    {
     region => "us-east-1"
     bucket => "our-s3-storage/daily_batch"
     size_file => 41943
     temporary_directory => "/logstash"
     use_ssl => "true"
    }
 }
}

在 Logstash 5.0 中,您将能够使用 topics 并为 kafka 输入提供一系列主题,并拥有

topics => ["topic", "topic-test", "daily_batch"]

在一个卡夫卡输入中。但是,这不能在 logstash 2.3 中完成,因为它没有topics字段。

通过使用 logstash 在每个事件的基础上将字段值插入到配置中的字符串中,您绝对可以压缩输出。为了确保您的数据不会在错误数据上出现奇怪的一次性存储桶,您可以使用数组检查是否。

if [@metadata][topic] in ["topic", "topic-test", "daily_batch"]
 {
  s3
    {
     region => "us-east-1"
     bucket => "our-s3-storage/%{[@metadata][topic]}"
     size_file => 41943
     temporary_directory => "/logstash"
     use_ssl => "true"
    }
 }
}

相关内容

  • 没有找到相关文章

最新更新