PoolingOptions and Cassandra - java



我正在使用datastax驱动程序使用Cassandra作为Apache Flink的一些数据流的接收器:我在执行应用程序时遇到问题,在运行时引发有关队列的错误,该队列在几秒钟后已满。我发现默认值是 256,对于我的负载来说可能太低了,所以我使用 poolingOptions 设置 maxRequestsPerConnection 提出了它,如下所示:http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/。

不幸的是,使用以下代码,我在启动它时收到以下错误:

The implementation of the ClusterBuilder is not serializable. 
The object probably contains or references non serializable fields.

我的代码:

PoolingOptions poolingOptions = new PoolingOptions();
    poolingOptions
      .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
      .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);

ClusterBuilder cassandraBuilder = new ClusterBuilder() {
    @Override
    public Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint(CASSANDRA_ADDRESS)
                      .withPort(CASSANDRA_PORT)
                      .withPoolingOptions(poolingOptions)
                      .build();
    }
};
sinkBuilderNormalStream
    .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
        + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
        + " VALUES (?, ?, ?, ?, ?, ?);")
    .setClusterBuilder(cassandraBuilder)
    .build();

我该如何处理?

您必须在 ClusterBuilder#buildCluster 中定义 PoolingOptions。

相关内容

  • 没有找到相关文章

最新更新