使用 Apache Flink 从 Web 获取 JSON 元素



在阅读了Apache Flink的几个文档页面(官方文档,dataartisans)以及官方存储库中提供的示例后,我不断看到它们用作数据源的示例,用于流式传输已下载的文件,始终连接到本地主机。

我正在尝试使用Apache Flink下载包含动态数据的JSON文件。我的目的是尝试建立我可以访问 JSON 文件的 url 作为 Apache Flink 的输入源,而不是用另一个系统下载它并使用 Apache Flink 处理下载的文件。

是否有可能与 Apache Flink 建立这种网络连接?

您可以将要下载的 URL 定义为输入DataStream,然后从MapFunction中下载文档。以下代码对此进行了演示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html");
inputURLs.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        URL url = new URL(s);
        InputStream is = url.openStream();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
        StringBuilder builder = new StringBuilder();
        String line;
        try {
            while ((line = bufferedReader.readLine()) != null) {
                builder.append(line + "n");
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
        try {
            bufferedReader.close();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
        return builder.toString();
    }
}).print();
env.execute("URL download job");

相关内容

  • 没有找到相关文章

最新更新