如何使用 apache beam 处理 Spark JavaRDD 数据?



我想处理来自 Spark JavaRDD 对象的数据,我正在使用 Apache beam 从 sparksession.sql(" query"( 检索这些数据。但是我无法将PTransform直接应用于此数据集。 我正在使用Apache Beam 2.14.0(升级的Spark运行器以使用Spark版本2.4.3。(光束-7265((。请为此指导我。

SparkSession session = SparkSession.builder().appName("test 2.0").master("local[*]").getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());
final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
options.setEnableSparkMetricSinks(false);
Pipeline pipeline = Pipeline.create(options);
List<StructField> srcfields  = new ArrayList<StructField>();
srcfields.add(DataTypes.createStructField("dataId", DataTypes.IntegerType, true));
srcfields.add(DataTypes.createStructField("code", DataTypes.StringType, true));
srcfields.add(DataTypes.createStructField("value", DataTypes.StringType, true));
srcfields.add(DataTypes.createStructField("dataFamilyId", DataTypes.IntegerType, true));
StructType dataschema = DataTypes.createStructType(srcfields);
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create(1, "AA", "Apple", 1));
dataList.add(RowFactory.create(2, "AB", "Orange", 1));
dataList.add(RowFactory.create(3, "AC", "Banana", 2));
dataList.add(RowFactory.create(4, "AD", "Guava", 3));
Dataset<Row> rawData = new SQLContext(jsc).createDataFrame(dataList, dataschema);//pipeline.getOptions().getRunner().cast();  
JavaRDD<Row> javadata = rawData.toJavaRDD();
System.out.println("***************************************************");
for(Row line:javadata.collect()){
System.out.println(line.getInt(0)+"t"+line.getString(1)+"t"+line.getString(2)+"t"+line.getInt(3));
}
System.out.println("***************************************************");
pipeline.apply(Create.of(javadata))
.apply(ParDo.of(new DoFn<JavaRDD<Row>,String> ()
{
@ProcessElement
public void processElement(ProcessContext c) {
JavaRDD<Row> row = c.element();
c.output("------------------------------");
System.out.println(".............................");
}
}
))
.apply("WriteCounts", TextIO.write().to("E:\output\out"));
final PipelineResult result = pipeline.run();
System.out.println();
System.out.println("***********************************end");

我不相信这是不可能的,因为Beam应该对Spark RDD一无所知,而Beam Spark Runner将所有与Spark相关的东西都隐藏在引擎盖下。潜在地,您可以创建自定义 Spark 特定PTransform,它将从 RDD 读取,并将其用作特定情况下管道的输入,但我不确定这是一个好主意,也许,它可以通过其他方式解决。您能否分享有关数据处理管道的更多详细信息?

无法直接将 Spark 数据集或 RDD 使用 Beam,但您应该能够将数据从 Hive 摄取到 Beam PCollection 中。请参阅 Beam 的 HCatalog IO 连接器的文档:https://beam.apache.org/documentation/io/built-in/hcatalog/

最新更新