我在SQL Server数据库中将问题和答案实体表示为Questions
和Answers
的2个表(见下文(。他们之间的关系是OneToMany
.
Questions
表
Id Title
-------------------
1 Question 1
2 Question 2
Answers
表:
Id Answer Question_Id
-------------------------------
1 answer 1 1
2 answer 2 1
3 answer 3 1
4 answer 4 2
5 answer 5 2
我想在通过 Logstash 管道移动数据后,获取具有下面提供的结构的 ES 文档:
{
“questionId": 1,
"questionTitle": "Question 1",
"questionAnswers": [
{
“answerId": 1,
"answer": "answer 1"
},
{
"answerId": 2,
"answer": "answer 2"
},
{
"answerId": 3,
"answer": "answer 3"
}
]
}
{
"questionId": 2,
"questionTitle": "Question 2",
"questionAnswers": [
{
"answerId": 4,
"answer": "answer 4"
},
{
"answerId": 5,
"answer": "answer 5"
}
]
}
logstash jdbc 输入插件设置使用Question_Answers视图来检索数据。
{
jdbc {
type => “Test_1”
jdbc_connection_string => "jdbc:sqlserver://myinstance:1433"
jdbc_user => “root”
jdbc_password => “root”
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_driver_library => "/home/abury/enu/mssql-jdbc-6.2.2.jre8.jar"
schedule => "*/3 * * * *"
statement => "SELECT * from Question_Answers"
}
}
视图返回的结果集如下所示:
questionId questionTitle answerId answer
1 Question 1 1 answer 1
1 Question 1 2 answer 2
1 Question 1 3 answer 3
2 Question 2 4 answer 4
2 Question 2 5 answer 5
Elasticsearch输出插件设置如下所示:
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "question"
document_id => "%{questionId}"
}
}
问题:如何设置 Logstash 以识别与同一问题相关的记录,并使用上面提供的所需结构构建 ES 文档?是否可以在 output.conf 文件中添加一些聚合逻辑以实现所需的行为?或者我需要重写我的数据库视图以返回每个问题的单个记录:
questionId questionTitle answerId answer
---------------------------------------------------------------------
1 Question 1 1, 2, 3 answer 1, answer 2, answer 3
更新:修复列名称中的拼写错误
SELECT
questionId,
questionTitle,
GROUP_CONCAT(answereId) answerIDs,
GROUP_CONCAT(answer) answers
FROM Question_Answers
GROUP BY questionId, questionTitle
顺便说一句,您的列名中有错别字answereId
我想您想要answerId
.
我能够通过使用logstash聚合过滤器插件获得所需的Elasticsearch文档结构(参见示例4(:
filter {
aggregate {
task_id => "%{questionId}"
code => "
map['questionId'] ||= event.get('questionid')
map['questionTitle'] ||= event.get('questiontitle')
map['questionAnswers'] ||= []
map['questionAnswers'] << {'answerId' => event.get('answerid'), 'answer' => event.get('answer')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}