我正在使用logstash创建一个从elasticsearch到create .io的管道。下面是配置。
input{
elasticsearch {
hosts => "<host_name>:9200"
index => "index1"
query => '{ "size":10,"query": {"match_all": {} } }'
}
}
filter{
if!([bench_pose][M_Body_t]) {
mutate{
add_field => {"[bench_pose][M_Body_t]" => null}
}
}
if!([bench_pose][M_Jaw_t]) {
mutate{
add_field => {"[bench_pose][M_Jaw_t]" => null}
}
}
}
output{
jdbc {
driver_class => "io.crate.client.jdbc.CrateDriver"
driver_auto_commit => false
driver_jar_path => "/etc/crate/crate-jdbc-standalone-1.12.3.jar"
connection_string => "crate://<host_ip>:4300"
statement => ["INSERT INTO table_name(path,bench_pose_m_Body_t,bench_pose_m_jaw_t) VALUES(?,?,?)",'path','%{[bench_pose][M_Body_t]}','%{[bench_pose][M_Jaw_t]}']
}
}
源代码有一个'bench_pose'字段,它是OBJECT数据类型,并且在bench_pose下的M_Body_t和M_Jaw_t字段是DOUBLE类型。这些字段在源和目标中都是空的。该字段在源代码中的几个文档中为空值。即使目标表具有NULLable字段,当插入语句遇到具有空值的文档时,也会抛出以下错误:
JDBC - Exception. Not retrying. {:exception=>java.sql.SQLException: Validation failed for bench_pose_m_body_t: 'null' cannot be cast to type double
如果我删除过滤器插件,它会抛出如下错误:
JDBC - Exception. Not retrying. {:exception=>java.sql.SQLException: Validation failed for bench_pose_m_body_t: '%{[bench_pose][M_Body_t]}' cannot be cast to type double
如果我在插入中硬编码空值并在crate sql上运行查询,它执行正确。如何在logstash中解析这些字段中的空值?
您需要将INSERT语句更改为:
statement => ["INSERT INTO table_name(path,bench_pose_m_Body_t,bench_pose_m_jaw_t) VALUES(?,TRY_CAST(? as double),TRY_CAST(? as double))",'path','%{[bench_pose][M_Body_t]}','%{[bench_pose][M_Jaw_t]}']
TRY_CAST
在类型转换不兼容的情况下将返回null而不是抛出错误。