使用Python的Spark流:相对于特定属性,加入两个流



我分别使用schemas s1和s2接收两个插座S1和S2。

我想使用火花流属性" A"加入S1和S2。以下是我的代码:

    sc = SparkContext("local[3]", "StreamJoin")
    ssc = StreamingContext(sc, 1) 
    S1 = ssc.socketTextStream("localhost", 9999)
    S2 = ssc.socketTextStream("localhost", 8085)
    # Create windowed stream
    wS1 = S1.window(10)
    wS2 = S2.window(1)
    wS1.flatMap(lambda line: line.split(",")).pprint()
    wS2.flatMap(lambda line: line.split(",")).pprint()
    # Perform join
    joinedStream = wS1.join(wS2)
    joinedStream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x)))
    ssc.start()             
    ssc.awaitTermination()

S1和S2都是逗号分开的。

尽管上述代码执行连接,但是对于完整的行。

我有兴趣就特定属性而加入这两个流,在这种情况下,属性" a"。我如何实现这一目标?

非常感谢!

在Spark中加入的方式是基于键加入RDD行,键是ROW [0]的值。因此,您可以做:

wS1.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint()
wS2.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint()

,然后将基于拆分列表的第一个元素完成加入。

文档参考:

https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight= join#pyspark.rdd.join

最新更新