将Cassandra添加为flink错误中的水槽:所有用于查询的主机都失败了



我在https://ci.apache.org/projects/flink/flink/flink/flink-docs-rease-1.4/dev/dev/connectors/cassandra.html上进行了示例卡桑德拉(Cassandra

我的代码如下所示

public class writeToCassandra {
    private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE test WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
    private static final String createTable = "CREATE TABLE test.cassandraData(id varchar, heart_rate varchar, PRIMARY KEY(id));" ;

    private final static Collection<String> collection = new ArrayList<>(50);
    static {
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
    }
    public static void main(String[] args) throws Exception {

        //setting the env variable to local
        StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1);

        DataStream<Tuple2<String, String>> dataStream = envrionment
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    final String mapped = " mapped ";
                    String[] splitted;
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        splitted = s.split("\s+");
                        return Tuple2.of(
                                UUID.randomUUID().toString(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });

        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.cassandraData(id,heart_rate) values (?,?);")
                .setHost("127.0.0.1")
                .build();

        envrionment.execute();
    } //main


} //writeToCassandra

我收到以下错误

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)

不确定这是否总是需要的,但是我设置cassandrasink的方式就是这样:

CassandraSink
    .addSink(dataStream)
    .setClusterBuilder(new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return Cluster.builder()
                .addContactPoints(myListOfCassandraUrlsString.split(","))
                .withPort(portNumber)
                .build();
        }
    })
    .build();

我有注释的pojos,这些pojos由dataStream返回,因此我不需要查询,但是您只需在" .addsink(...)"行之后包含" .setquery(...)"行。/p>

异常简单地表明示例程序无法到达C*数据库。

  1. Flink-Cassandra-Connector提供的流API可以连接到指定的C*数据库。因此,您需要运行C*实例。
  2. 将每个流动作业都推送到任务管理器运行的节点。在您的示例中,您假设C*在与TM节点相同的节点上运行。另一种选择是将C*地址从127.0.0.1更改为公共地址。

相关内容

  • 没有找到相关文章

最新更新