Spark Streaming 将查询参数传递给 Neo4j Scala



我正在尝试使用Spark-Neo4j连接器执行Cypher查询。我想将参数从 Kafka 生成的数据流传递到此查询中。Cypher 查询的结果应显示为数据框字段。与 Neo4j 的连接已成功建立,我的查询在简单的火花上下文中工作正常。但是,相同的代码不适用于流式处理上下文。使用 Spark 流时与 Neo4j 连接配置有什么区别吗?

下面是流式处理上下文的代码。我在这里不使用 Kafka 作为生产者,参数的数据在数据数组中定义,用于测试连接和查询本身:

val sparkSession = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))
sparkSession.conf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
sparkSession.conf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
sparkSession.conf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
val neo = Neo4j(streamingContext.sparkContext)
val data = Array("18731", "41.84000015258789", "-87.62999725341797")
val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN distinct g.group_name as group_name, e.event_name as event_name, v.venue_name as venue_name"

val paramsMap = Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt)
val df = neo.cypher(query, paramsMap).loadDataFrame("group_name" -> "string", "event_name" -> "string", "venue_name" -> "string")
df.show()
streamingContext.start()
streamingContext.awaitTermination()

我通过向SparkSession提供Neo4j所需的参数来解决这个问题。这是代码:

val config = "neo4j_local"
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
sparkConf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
sparkConf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()

最新更新