如何使用 Apache Flink 从 Cassandra 阅读?



我的 flink 程序应该对每个输入记录进行 Cassandra 查找,并根据结果进行一些进一步的处理。

但我目前只能从Cassandra读取数据。这是我到目前为止想出的代码片段。

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
.withPort(props.getCassandraPort())
.withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.build();
}
};
for (int i=1; i<5; i++) {
CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2<String, String> out = new Tuple8<>();
cassandraInputFormat.nextRecord(out);
System.out.println(out);
}

但问题是,每次查找需要近 10 秒,换句话说,这个for循环需要 50 秒才能执行。

如何加快此操作速度?或者,有没有其他方法可以在 Flink 中查找 Cassandra?

我想出了一个解决方案,它使用流数据查询Cassandra的速度相当快。对有相同问题的人有用。

首先,Cassandra可以用最少的代码进行查询:

Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");

但问题是,创建Session是一项非常耗时的操作,并且应该在每个键空间执行一次。您只需创建一次Session,即可将其重用于所有读取查询。

现在,由于Session不是 Java 可序列化的,因此不能将其作为参数传递给 Flink 运算符,如MapProcessFunction。有几种方法可以解决这个问题,您可以使用 RichFunction 并在其Open方法中对其进行初始化,或者使用单例。我将使用第二种解决方案。

创建一个单例类,如下所示,我们在其中创建Session

public class CassandraSessionSingleton {
private static CassandraSessionSingleton cassandraSessionSingleton = null;
public Session session;
private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
Cluster cluster = clusterBuilder.getCluster();
session = cluster.connect();
}
public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
if (cassandraSessionSingleton == null)
cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
return cassandraSessionSingleton;
}
}

然后,您可以将此会话用于将来的所有查询。在这里,我使用ProcessFunction作为查询示例。

public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
ClusterBuilder secureCassandraSinkClusterBuilder;
// Constructor
public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
}
@Override
public void  ProcessElement (Object obj) throws Exception {
ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
return resultSet;
}
}

请注意,您可以将ClusterBuilder传递给ProcessFunction因为它是可序列化的。现在是我们执行查询的cassandraLookUp方法。

public class CassandraLookUp {
public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
Session session = cassandraSessionSingleton.session;
ResultSet resultSet = session.execute(query);
return resultSet;
}
}

仅在第一次运行查询时创建单一实例对象,之后将重用同一对象,因此查找没有延迟。

相关内容

  • 没有找到相关文章

最新更新