我正在尝试将mysql数据(动态变化)与elasticsearch
同步。我正在使用logstash
进行同步。我需要过滤半径 1KM 内的车辆列表以查找通过的geo_points
.在弹性搜索上进行geo_point查询时,我收到错误为:
{"error":
{"root_cause": [
{"type": "query_parsing_exception",
"reason": "failed to find geo_point field [location]",
"index":"current_location",
"line":13,
"col":9}
],
"type": "search_phase_execution_exception",
"reason": "all shards failed",
"phase":"query",
"grouped": true,
"failed_shards":[
{"shard":0,
"index": "current_location",
"node":"Pf8eXXX2QMuaskBuLSdcXw",
"reason": {
"type":"query_parsing_exception",
"reason":"failed to find geo_point field [location]",
"index":"current_location",
"line":13,
"col":9}}
]},
"status":400}
我正在打Curl
电话是:
curl -XGET http://MY.ELASTIC.SEARCH.IP:9200/current_location/vehicle/_search -d '
{
"query": {
"filtered": {
"filter": {
"geo_distance": {
"distance": "1km",
"distance_type": "plane",
"location": {
"lat": 40.715,
"lon": -73.988
}
}
}
}
}
}'
对任何索引的 curl 调用的结果为:
curl -XGET http://MY.ELASTIC.SEARCH.IP:9200/current_location/vehicle/XX12XX34XX56
{
"_index": "current_location",
"_type": "vehicle",
"_id": "XX12XX34XX56",
"_version": 1,
"found": true,
"_source": {
"obd_id": "XX12XX34XX56",
"system_timestamp": 1464855810,
"gps_timestamp": 1464855807,
"speed": 1.476,
"direction": 0,
"latitude": 12.9699,
"longitude": 77.7004,
"@version": "1",
"@timestamp": "2016-06-05T16:09:46.712Z",
"type": "vehicle",
"location": {
"lat": 12.9699,
"lon": 77.7004
}
}
}
我的logstash.conf
文件的内容是:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://MY.MYSQL.DB.IP:3306/my_db"
jdbc_user => "MY_USER"
jdbc_password => "MY_PASSWORD"
jdbc_validate_connection => true
jdbc_driver_library => "/home/moin/moin/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT obd_id, system_timestamp, gps_timestamp, speed*0.036 as speed, direction, latitude/3600000.00 as latitude, longitude/3600000.00 as longitude from current_location"
use_column_value => true
tracking_column => system_timestamp
add_field => {"type" => "vehicle"}
}
}
filter {
if [latitude] and [longitude] {
mutate {
add_field => {
"[location][lat]" => "%{latitude}"
"[location][lon]" => "%{longitude}"
}
}
mutate {
convert => {
"[location][lat]" => "float"
"[location][lon]" => "float"
}
}
}
}
output {
elasticsearch {
index => "current_location"
template_overwrite => true
template => "/tmp/current-loc.json"
document_type => "%{type}"
document_id => "%{obd_id}"
hosts => "MY.ELASTIC.SEARCH.IP"
}
}
我的模板文件current-loc.json
如下:
{
"order": 0,
"template": "logstash-*",
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"message_field": {
"mapping": {
"index": "analyzed",
"omit_norms": true,
"fielddata": {
"format": "disabled"
},
"type": "string"
},
"match": "message",
"match_mapping_type": "string"
}
},
{
"string_fields": {
"mapping": {
"index": "analyzed",
"omit_norms": true,
"fielddata": {
"format": "disabled"
},
"type": "string",
"fields": {
"raw": {
"index": "not_analyzed",
"ignore_above": 256,
"doc_values": true,
"type": "string"
}
}
},
"match": "*",
"match_mapping_type": "string"
}
},
{
"float_fields": {
"mapping": {
"doc_values": true,
"type": "float"
},
"match": "*",
"match_mapping_type": "float"
}
},
{
"double_fields": {
"mapping": {
"doc_values": true,
"type": "double"
},
"match": "*",
"match_mapping_type": "double"
}
},
{
"byte_fields": {
"mapping": {
"doc_values": true,
"type": "byte"
},
"match": "*",
"match_mapping_type": "byte"
}
},
{
"short_fields": {
"mapping": {
"doc_values": true,
"type": "short"
},
"match": "*",
"match_mapping_type": "short"
}
},
{
"integer_fields": {
"mapping": {
"doc_values": true,
"type": "integer"
},
"match": "*",
"match_mapping_type": "integer"
}
},
{
"long_fields": {
"mapping": {
"doc_values": true,
"type": "long"
},
"match": "*",
"match_mapping_type": "long"
}
},
{
"date_fields": {
"mapping": {
"doc_values": true,
"type": "date"
},
"match": "*",
"match_mapping_type": "date"
}
},
{
"geo_point_fields": {
"mapping": {
"doc_values": true,
"type": "geo_point"
},
"match": "*",
"match_mapping_type": "geo_point"
}
}
],
"properties": {
"@timestamp": {
"doc_values": true,
"type": "date"
},
"location": {
"doc_values": true,
"type": "geo_point"
},
"geoip": {
"dynamic": true,
"properties": {
"location": {
"doc_values": true,
"type": "geo_point"
},
"longitude": {
"doc_values": true,
"type": "float"
},
"latitude": {
"doc_values": true,
"type": "float"
},
"ip": {
"doc_values": true,
"type": "ip"
}
},
"type": "object"
},
"@version": {
"index": "not_analyzed",
"doc_values": true,
"type": "string"
}
},
"_all": {
"enabled": true,
"omit_norms": true
}
}
},
"aliases": {}
}
我尝试了互联网上几乎所有可用的东西。任何关于如何实现它的想法都会有所帮助。提前谢谢。:)
终于解决了问题。在我的模板文件中,我使用的值是:"template": "logstash-*"
。我不知道这是为了通知要应用此模板的索引。将值更新为"template": "my_elastic_index"
解决了此问题。