Flink Twitter流示例不适用于自定义端点



我正在github上扩展Flink Connector,以获取自定义URL的Twitter流,尽管我能够按照示例代码中给出的随机推文,但是当我给出自定义URL时没有被提取(在控制台和文件上都没有打印任何东西)。我写了一个自定义点,如下

给出
public class CustomEndPoint implements EndpointInitializer, Serializable{
@Override
public StreamingEndpoint createEndpoint() {
    return new RawEndpoint("https://api.twitter.com/1.1/search/tweets.json?q=%23apple", "GET");     
}}

以下是我如何将我的自定义终端点列入连接器API中给出的Twittersource类

TwitterSource twitterSource = new TwitterSource(params.getProperties());
    twitterSource.setCustomEndpointInitializer(new CustomEndPoint());
        streamSource = env.addSource(twitterSource);

直接修改twittersource类的代码(即将我的端点硬编码到字段变量)时,相同的端点也有效,但是我不想这样做,因为这不是使用API的最佳方法,另外,我失去了在没有代码更改的情况下给出不同端点的能力

这是因为您必须设置twittersource主机属性。您的主机不是Flink Twitter源客户端的默认主机。

props.setProperty(TwitterSource.CLIENT_HOSTS,"https://api.twitter.com");

使用的默认主机是" https://stream.twitter.com"请参阅Twittersource类,运行方法:

    client = new ClientBuilder()
        .name(properties.getProperty(CLIENT_NAME, "flink-twitter-source"))
        .hosts(properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST))
        .endpoint(endpoint)
        .authentication(auth)
        .processor(new HosebirdMessageProcessor() {

...

相关内容

  • 没有找到相关文章

最新更新