Spark:读取inputStream而不是File



我在Java应用程序中使用SparkSQL,使用Databricks进行解析,对CSV文件进行一些处理。

我正在处理的数据来自不同的来源(远程URL、本地文件、谷歌云存储),我习惯于将所有内容都转换为InputStream,这样我就可以在不知道数据来自哪里的情况下解析和处理数据。

我在Spark上看到的所有文档都从一个路径读取文件,例如

SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);
DataFrame df = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("path/to/file.csv");
DataFrame dfGrouped = df.groupBy("varA","varB")
    .avg("varC","varD");
dfGrouped.show();

我想做的是从InputStream中读取,甚至只是从一个已经在内存中的字符串中读取。类似以下内容:

InputStream stream = new URL(
    "http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
    ).openStream();
DataFrame dfRemote = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(stream);
String someString = "imagine,some,csv,data,here";
DataFrame dfFromString = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .read(someString);

这里有什么简单的东西我遗漏了吗?

我读过一些关于Spark Streaming和自定义接收器的文档,但据我所知,这是为了打开一个将持续提供数据的连接。Spark Streaming似乎将数据分成块并对其进行一些处理,期望更多的数据以无休止的流形式出现。

我的最佳猜测是,Spark作为Hadoop的后代,预计会有大量数据驻留在某个文件系统中。但由于Spark无论如何都是在内存中进行处理的,所以SparkSQL能够解析内存中已经存在的数据对我来说是有意义的。

如有任何帮助,我们将不胜感激。

你可以使用至少四种不同的方法来让你的生活更轻松:

  1. 使用输入流,写入本地文件(使用SSD快速),使用Spark读取。

  2. 将Hadoop文件系统连接器用于S3、谷歌云存储,并将所有内容转换为文件操作。(这不会解决从任意URL读取的问题,因为没有HDFS连接器。)

  3. 将不同的输入类型表示为不同的URI,并创建一个实用函数来检查URI并触发适当的读取操作。

  4. 与(3)相同,但使用用例类而不是URI,并且只是基于输入类型进行重载。

相关内容

  • 没有找到相关文章

最新更新