使用 apache flink Java API 将数据读取和写入 cassandra



我打算使用Apache Flink使用Flink将数据读/写入Cassandra。我希望使用Flink-Connector-Cassandra,我找不到连接器的好文档/示例。

您可以用Apache Flink指出我的正确方法,以读取和编写来自Cassandra的数据。我只看到纯粹用于写作的水槽示例?Apache Flink是否也可以从Cassandra读取数据类似于Apache Spark?

我也有同样的问题,这就是我想要的。我不知道它是否过度简化了您的需求,但是认为我应该证明它。

ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("urlToUse.com").withPort(9042).build();
        }
    };
    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);
    cassandraInputFormat.configure(null);
    cassandraInputFormat.open(null);
    Tuple2<String, String> testOutputTuple = new Tuple2<>();
    cassandraInputFormat.nextRecord(testOutputTuple);
    System.out.println("column1: " + testOutputTuple.f0);
    System.out.println("column2: " + testOutputTuple.f1);

我弄清楚的是要归功于找到" Cassandrainputformat"类的代码并查看其工作原理(http://www.javatips.net/api/api/papi/flink-master/flink-master/flink-connectors/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/cassandra/cassandrainputformat.java)。老实说,我希望它只是一种格式,而不是基于名字的Cassandra的完整阅读,我有一个人可能会在想同一件事。

    ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("localhost").withPort(9042).build();
        }
    };
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));
    DataStreamSource t = env.createInput(inputFormat,  TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
    tableEnv.registerDataStream("t1",t);
    Table t2 = tableEnv.sql("select * from t1");
    t2.printSchema();

您可以使用RichFlatMapFunction扩展类

class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
  var userCollection: MongoCollection[Document] = _
  override def open(parameters: Configuration): Unit = {
// do something here like opening connection
    val client: MongoClient = MongoClient("mongodb://localhost:10000")
    userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
    super.open(parameters)
  }
  override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {
// Do something here per record and this function can make use of objects initialized via open
      userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
        (result: Document) => {
//          println(result)
                      },
      (t: Throwable) =>{
        println(t)
      },
        ()=>{
          out.collect(event)
        }
      )
    }

  }
}

基本上open函数每个工人执行一次,并且flatmap每个记录执行它。该示例是针对Mongo的,但可以类似地用于Cassandra

在您的情况下,我了解管道的第一步是从卡桑德拉读取数据,而不是编写RichFlatMapFunction,您应该编写自己的RichSourceFunction

作为参考,您可以查看Wikipediaeditssource的简单实现。

相关内容

  • 没有找到相关文章

最新更新