SQL Server 数据库 > Logstash > Elasticsearch:将与同一实体相关的结果集记录映射到同一 ES 文档



我在SQL Server数据库中将问题和答案实体表示为QuestionsAnswers的2个表(见下文(。他们之间的关系是OneToMany.

Questions

Id      Title
-------------------
1      Question 1
2      Question 2

Answers表:

Id    Answer        Question_Id
-------------------------------
1     answer 1      1
2     answer 2      1
3     answer 3      1
4     answer 4      2
5     answer 5      2

我想在通过 Logstash 管道移动数据后,获取具有下面提供的结构的 ES 文档:

{
“questionId": 1,
"questionTitle": "Question 1",
"questionAnswers": [
{
“answerId": 1,
"answer": "answer 1"
},
{
"answerId": 2,
"answer": "answer 2"
},
{
"answerId": 3,
"answer": "answer 3"
}
]
}
{
"questionId": 2,
"questionTitle": "Question 2",
"questionAnswers": [
{
"answerId": 4,
"answer": "answer 4"
},
{
"answerId": 5,
"answer": "answer 5"
}
]
}

logstash jdbc 输入插件设置使用Question_Answers视图来检索数据。

{
jdbc {
type => “Test_1”
jdbc_connection_string => "jdbc:sqlserver://myinstance:1433"
jdbc_user => “root”
jdbc_password => “root”
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_driver_library => "/home/abury/enu/mssql-jdbc-6.2.2.jre8.jar"
schedule => "*/3 * * * *"
statement => "SELECT * from Question_Answers"
}
}

视图返回的结果集如下所示:

questionId  questionTitle   answerId    answer
1           Question 1      1           answer 1
1           Question 1      2           answer 2
1           Question 1      3           answer 3
2           Question 2      4           answer 4
2           Question 2      5           answer 5

Elasticsearch输出插件设置如下所示:

output {
elasticsearch {
hosts => "http://localhost:9200"
index => "question"
document_id => "%{questionId}"
}
}

问题:如何设置 Logstash 以识别与同一问题相关的记录,并使用上面提供的所需结构构建 ES 文档?是否可以在 output.conf 文件中添加一些聚合逻辑以实现所需的行为?或者我需要重写我的数据库视图以返回每个问题的单个记录:

questionId  questionTitle   answerId    answer
---------------------------------------------------------------------
1           Question 1      1, 2, 3     answer 1, answer 2, answer 3

更新:修复列名称中的拼写错误

SELECT 
questionId,
questionTitle,
GROUP_CONCAT(answereId) answerIDs,
GROUP_CONCAT(answer) answers
FROM Question_Answers
GROUP BY questionId, questionTitle

顺便说一句,您的列名中有错别字answereId我想您想要answerId.

我能够通过使用logstash聚合过滤器插件获得所需的Elasticsearch文档结构(参见示例4(:

filter {
aggregate {
task_id => "%{questionId}"
code => "
map['questionId'] ||= event.get('questionid')
map['questionTitle'] ||= event.get('questiontitle')

map['questionAnswers'] ||= []
map['questionAnswers'] << {'answerId' => event.get('answerid'), 'answer' => event.get('answer')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}

最新更新