我在https://ci.apache.org/projects/flink/flink/flink/flink-docs-rease-1.4/dev/dev/connectors/cassandra.html上进行了示例卡桑德拉(Cassandra
我的代码如下所示
public class writeToCassandra {
private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE test WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
private static final String createTable = "CREATE TABLE test.cassandraData(id varchar, heart_rate varchar, PRIMARY KEY(id));" ;
private final static Collection<String> collection = new ArrayList<>(50);
static {
for (int i = 1; i <= 50; ++i) {
collection.add("element " + i);
}
}
public static void main(String[] args) throws Exception {
//setting the env variable to local
StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1);
DataStream<Tuple2<String, String>> dataStream = envrionment
.fromCollection(collection)
.map(new MapFunction<String, Tuple2<String, String>>() {
final String mapped = " mapped ";
String[] splitted;
@Override
public Tuple2<String, String> map(String s) throws Exception {
splitted = s.split("\s+");
return Tuple2.of(
UUID.randomUUID().toString(),
splitted[0] + mapped + splitted[1]
);
}
});
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO test.cassandraData(id,heart_rate) values (?,?);")
.setHost("127.0.0.1")
.build();
envrionment.execute();
} //main
} //writeToCassandra
我收到以下错误
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
不确定这是否总是需要的,但是我设置cassandrasink的方式就是这样:
CassandraSink
.addSink(dataStream)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return Cluster.builder()
.addContactPoints(myListOfCassandraUrlsString.split(","))
.withPort(portNumber)
.build();
}
})
.build();
我有注释的pojos,这些pojos由dataStream返回,因此我不需要查询,但是您只需在" .addsink(...)"行之后包含" .setquery(...)"行。/p>
异常简单地表明示例程序无法到达C*数据库。
- Flink-Cassandra-Connector提供的流API可以连接到指定的C*数据库。因此,您需要运行C*实例。
- 将每个流动作业都推送到任务管理器运行的节点。在您的示例中,您假设C*在与TM节点相同的节点上运行。另一种选择是将C*地址从127.0.0.1更改为公共地址。