如何使用logstash读取多个csv文件



我有2*.csv文件。

Customers.csv

CustomerID,CustomerName,ContactName,Country,CustomerCreateDate
1,Alfreds Futterkiste,Maria Anders,Germany,2022/4/1
2,Ana Trujillo Emparedados y helados,Ana Trujillo,Mexico,2022/5/3
3,Antonio Moreno Taquería,Antonio Moreno,Mexico,2022/4/23

Orders.csv

OrderID,CustomerID,OrderDate
10308,2,1996/9/18
10309,37,1996/9/19
10310,77,1996/9/20

是否可以使用Logstash和Elasticsearch创建包含CustomerIDCustomerNameContactNameCountryCustomerCreateDateOrderIDOrderDate字段的数据视图。

我知道我可以用这个logstash.conf创建一个包含CustomerIDCustomerNameContactNameCountryCustomerCreateDate字段的数据视图。

input {
file {
path => "/path/to/Customers.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["CustomerID","CustomerName","ContactName","Country","CustomerCreateDate"]
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "customers"
}
}

我知道我可以用这个logstash.conf创建一个包含OrderIDCustomerIDOrderDate字段的数据视图。

input {
file {
path => "/path/to/Orders.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["OrderID","CustomerID","OrderDate"]
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "orders"
}
}

是否可以将上述两个数据视图合并为一个?

有一个不好的主意。你可以创建两个管道logstash文件,比如:orders.confcustomers.conforders.conf文件看起来像这样(它使用CustomerID代替document_id):

input {
file {
path => "/path/to/Orders.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["OrderID","CustomerID","OrderDate"]
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
document_id => "%{CustomerID}"
index => "orders"
}
}

customers.conf是这样的(它通过document_id更新文档):

input {
file {
path => "/path/to/Customers.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["CustomerID","CustomerName","ContactName","Country","CustomerCreateDate"]
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
document_id => "%{CustomerID}"
action => "update"
index => "customers"
}
}