neo4j with Flink and Scala



我正在使用Scala 2.11.7和Flink 1.3.2处理数据。现在,我想将结果org.apache.flink.api.scala.dataset存储在neo4j图形数据库中。

有兼容性的GitHub项目:

  • flink with neo4j:https://github.com/s1ck/flink-neo4j
  • 带有neo4j的scala:_https://github.com/fakod/neo4j-scala
  • flink的图库" gelly" with neo4j:_https://github.com/albertodelazzari/gelly-neo4j

最有前途的方法是什么?还是我应该最好直接使用neo4j的REST API?

(顺便说一句:为什么stackoverflow限制了链接的数量postet ...?)

我尝试了Flink-neo4j,但是混合Java和Scala类似乎存在一些问题:

package dummy.neo4j
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat
import org.apache.flink.api.java.tuple.{Tuple, Tuple2}
import org.apache.flink.api.scala._
object Neo4jDummyWriter {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val outputFormat: OutputFormat[_ <: Tuple] = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/")
  .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
  .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish
    val tuple1: Tuple = new Tuple2("abc", 1)
    val tuple2: Tuple = new Tuple2("def", 2)
    val test = env.fromElements[Tuple](tuple1, tuple2)
    println("test: " + test.getClass)
    test.output(outputFormat)
  }
}

线程" main" java.lang.classcastException中的例外:[ljava.lang.object;不能施放给[lorg.apache.flink.api.common.typeinfo.typeinformation; 在dummy.neo4j.neo4jdummywriter $。 在dummy.neo4j.neo4jdummywriter.main(neo4jdummywriter.scala)

键入不匹配,预期:outputformat [tuple],实际:outputformat [_&lt;:tuple]

解决方案不是将tuple2对象更改为元组:

package dummy.neo4j
import org.apache.flink.api.common.io._
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala._
object Neo4jDummyWriter {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tuple1 = ("user9", 1978)
    val tuple2 = ("user10", 1996)
    val datasetWithScalaTuples = env.fromElements(tuple1, tuple2)
    val dataset: DataSet[Tuple2[String, Int]] = datasetWithScalaTuples.map(tuple => new Tuple2(tuple._1, tuple._2))
    val outputFormat = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/").setUsername("neo4j").setPassword("...")
  .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
  .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish.asInstanceOf[OutputFormat[Tuple2[String, Int]]]
    dataset.output(outputFormat)
    env.execute
  }
}

最新更新