如何使用Node中的Needle关闭Twitter流?



我正在尝试新的Twitter v2 API,我在一个节点应用程序中使用GitHub的示例。它们使用npm模块needle来处理它们的请求。

// Open a realtime stream of Tweets, filtered according to rules
// https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/quick-start
const needle = require('needle');
// The code below sets the bearer token from your environment variables
// To set environment variables on macOS or Linux, run the export command below from the terminal:
// export BEARER_TOKEN='YOUR-TOKEN'
const token = process.env.BEARER_TOKEN;
const rulesURL = 'https://api.twitter.com/2/tweets/search/stream/rules';
const streamURL = 'https://api.twitter.com/2/tweets/search/stream';
// this sets up two rules - the value is the search terms to match on, and the tag is an identifier that
// will be applied to the Tweets return to show which rule they matched
// with a standard project with Basic Access, you can add up to 25 concurrent rules to your stream, and
// each rule can be up to 512 characters long
// Edit rules as desired below
const rules = [{
'value': 'dog has:images -is:retweet',
'tag': 'dog pictures'
},
{
'value': 'cat has:images -grumpy',
'tag': 'cat pictures'
},
];
async function getAllRules() {
const response = await needle('get', rulesURL, {
headers: {
"authorization": `Bearer ${token}`
}
})
if (response.statusCode !== 200) {
throw new Error(response.body);
}
return (response.body);
}
async function deleteAllRules(rules) {
if (!Array.isArray(rules.data)) {
return null;
}
const ids = rules.data.map(rule => rule.id);
const data = {
"delete": {
"ids": ids
}
}
const response = await needle('post', rulesURL, data, {
headers: {
"content-type": "application/json",
"authorization": `Bearer ${token}`
}
})
if (response.statusCode !== 200) {
throw new Error(response.body);
}
return (response.body);
}
async function setRules() {
const data = {
"add": rules
}
const response = await needle('post', rulesURL, data, {
headers: {
"content-type": "application/json",
"authorization": `Bearer ${token}`
}
})
if (response.statusCode !== 201) {
throw new Error(response.body);
}
return (response.body);
}
function streamConnect() {
const stream = needle.get(streamURL, {
headers: {
"User-Agent": "v2FilterStreamJS",
"Authorization": `Bearer ${token}`
},
timeout: 20000
});
stream.on('data', data => {
try {
const json = JSON.parse(data);
console.log(json);
} catch (e) {
// Keep alive signal received. Do nothing.
}
}).on('error', error => {
if (error.code === 'ETIMEDOUT') {
stream.emit('timeout');
}
});
return stream;
}

(async () => {
let currentRules;
try {
// Gets the complete list of rules currently applied to the stream
currentRules = await getAllRules();
// Delete all rules. Comment the line below if you want to keep your existing rules.
await deleteAllRules(currentRules);
// Add rules to the stream. Comment the line below if you don't want to add new rules.
await setRules();
} catch (e) {
console.error(e);
process.exit(-1);
}
// Listen to the stream.
// This reconnection logic will attempt to reconnect when a disconnection is detected.
// To avoid rate limits, this logic implements exponential backoff, so the wait time
// will increase if the client cannot reconnect to the stream.
const filteredStream = streamConnect();
let timeout = 0;
filteredStream.on('timeout', () => {
// Reconnect on error
console.warn('A connection error occurred. Reconnecting…');
setTimeout(() => {
timeout++;
streamConnect();
}, 2 ** timeout);
streamConnect();
})
})();

我现在的问题是如何关闭这个流。在我的应用程序,我试图手动停止和重新启动流,当我需要它。但是在我的测试中,我运行了几次,达到了打开流的最大数量。

谢谢你的建议!

可以通过中止请求来停止流,

stream.request.abort()

其中stream是在streamConnect()函数

中找到的变量

最新更新