我正在github上扩展Flink Connector,以获取自定义URL的Twitter流,尽管我能够按照示例代码中给出的随机推文,但是当我给出自定义URL时没有被提取(在控制台和文件上都没有打印任何东西)。我写了一个自定义点,如下
给出public class CustomEndPoint implements EndpointInitializer, Serializable{
@Override
public StreamingEndpoint createEndpoint() {
return new RawEndpoint("https://api.twitter.com/1.1/search/tweets.json?q=%23apple", "GET");
}}
以下是我如何将我的自定义终端点列入连接器API中给出的Twittersource类
TwitterSource twitterSource = new TwitterSource(params.getProperties());
twitterSource.setCustomEndpointInitializer(new CustomEndPoint());
streamSource = env.addSource(twitterSource);
直接修改twittersource类的代码(即将我的端点硬编码到字段变量)时,相同的端点也有效,但是我不想这样做,因为这不是使用API的最佳方法,另外,我失去了在没有代码更改的情况下给出不同端点的能力
这是因为您必须设置twittersource主机属性。您的主机不是Flink Twitter源客户端的默认主机。
props.setProperty(TwitterSource.CLIENT_HOSTS,"https://api.twitter.com");
使用的默认主机是" https://stream.twitter.com"请参阅Twittersource类,运行方法:
client = new ClientBuilder()
.name(properties.getProperty(CLIENT_NAME, "flink-twitter-source"))
.hosts(properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST))
.endpoint(endpoint)
.authentication(auth)
.processor(new HosebirdMessageProcessor() {
...