我想将下面的对象附加到流中的每个对象
{"index":{"_index":"tvseries","_type":"internindex"}}
我的直播如下所示
[
{"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"01","content":"What?","season":"1"}
]
我的流应该是什么样子的!
> -> POST http://localhost:9200/_bulk {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X Files","episode":"04","content":"Before
> what?","season":"1"}
> {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X
> Files","episode":"04","content":"Great.","season":"1"}
> {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X
> Files","episode":"01","content":"What?","season":"1"}
如何在我现有的以下代码库中使用 JSON 流来实现这一点
var stream = new ElasticsearchWritableStream(client, {
highWaterMark: 256,
flushTimeout: 500
});
pg.connect(connectionString,function(err, client, done) {
if(err) throw err;
var query = new QueryStream('SELECT * FROM srt limit 2')
var streams = client.query(query)
//release the client when the stream is finished
streams.on('end', done)
streams.pipe(JSONStream.stringify()).pipe(stream)
})
我目前使用的 npm 包
用于在弹性搜索中批量插入!
弹性搜索可写流
用于将数据从 postgres 获取到流中!
pg-query-stream
缺少的部分是将 Postgres 流转换为弹性可写流! 任何建议,指针,关于如何实现这一目标的建议!
所以基本上,唯一可行的选择,没有太多代码更改,就是构建从 postgres 本身批量插入弹性搜索所需的格式,而不是在 node.js 对象中!
"SELECT 'tvseries' as index,'internindex' as type, json_build_object('showname', showname, 'epsiode', ep,'content',content,'season',season) AS body"
+" FROM srt where shownameid=4"