我已经构建了一个Spring响应式应用程序,试图连接到Cassandra。我的代码片段如下:
解决方案1:Reactive Cassandra [Not Working]
Maven:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
<version>3.0</version>
</dependency>
应用程序。yml文件:
spring.data.cassandra.contact-points=<my connection points>
spring.data.cassandra.username=abc
spring.data.cassandra.password=xyz
spring.data.cassandra.local-datacenter=datacenter1
spring.data.cassandra.keyspace-name=mykeyspace
spring.data.cassandra.port=9042
配置:
@Configuration
@EnableReactiveCassandraRepositories
public class LocalBeanConfig extends AbstractReactiveCassandraConfiguration {
@Override
protected String getKeyspaceName() {
return "mykeyspace";
}
表类:
@AllArgsConstructor
@Table("test_table")
public class TableClass {
@Getter
@PrimaryKey
@Column("id")
private String id;
@Column("value")
@Getter
private String value;
}
库类:
@Repository
public interface TestAppRepository extends ReactiveCassandraRepository<TableClass, String> {
}
最后我访问Cassandra如下:
public class MyService {
private final TestAppRepository testRepository;
public void get(CrawlTask crawlTask) {
testRepository.findById("id_1").map(testAppConfig -> test1(testAppConfig)).switchIfEmpty(Mono.just(test2()));
}
}
无论如何,我无法连接到响应式Cassandra,并在启动时得到以下异常:
Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException
相反,如果我使用Spring Cassandra而不是Reactive Cassandra,我就可以连接。我的工作解决方案代码片段如下:
方案2:无反应Cassandra [Working]
Maven:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
application.yml
spring.cassandra.contact-points=<my connection points>
spring.cassandra.username=abc
spring.cassandra.password=xyz
spring.cassandra.local-datacenter=datacenter1
spring.cassandra.keyspace-name=mykeyspace
spring.cassandra.port=9042
表类:
@AllArgsConstructor
@Table("test_table")
public class TableClass {
@Getter
@PrimaryKey
@Column("id")
private String id;
@Column("value")
@Getter
private String value;
}
存储库:
public interface TestAppRepository extends CassandraRepository<TableClass, String> {
}
最后我访问Cassandra如下:
public class MyService {
private final TestAppRepository testRepository;
public void get(CrawlTask crawlTask) {
Mono.just(testRepository.findById("id_1").map(testAppConfig -> test1(testAppConfig)).orElse(test2()));
}
}
通过这种方式,我可以毫无问题地访问Cassandra表。
现在我看到两个解决方案之间的主要区别,解决方案1[不工作]和解决方案2[工作]是Cassandra配置的定义方式。
在非反应情况下我们使用spring.cassandra
,在反应情况下我们使用spring.data.cassandra
。
我也尝试过使用spring.cassandra
与反应情况编写配置。这里服务启动了,但是在从表中获取数据时,它没有获得任何数据。(内部MyService)
我不确定为什么非反应性解决方案有效,而反应性解决方案却不行。我遗漏了什么?有人能帮我一下吗?
编辑1:
根据请求,共享完整的Cassandra堆栈跟踪+错误日志:
s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Reactive Cassandra repositories in DEFAULT mode.
.s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 65 ms. Found 1 Reactive Cassandra repository interfaces.
.s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Cassandra repositories in DEFAULT mode.
.s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 4 ms. Found 0 Cassandra repository interfaces.
c.d.o.d.i.core.DefaultMavenCoordinates : DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.15.0
c.d.oss.driver.internal.core.time.Clock : Using native clock for microsecond precision
c.d.o.d.i.c.control.ControlConnection : [s0] Error connecting to Node(endPoint=/127.0.0.1:9042, hostId=null, hashCode=3f2701f1), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request
成功解决了这个问题。解决方案如下:
由于我使用的是响应式3.0版本,Cassandra的细节必须如下所述:
spring.cassandra.contact-points=<my connection points>
spring.cassandra.username=abc
spring.cassandra.password=xyz
spring.cassandra.local-datacenter=datacenter1
spring.cassandra.keyspace-name=mykeyspace
spring.cassandra.port=9042
spring.cassandra.schema-action=NONE
更重要的是,我们需要一个@Configuration类,定义如下:
@Configuration
@EnableReactiveCassandraRepositories
public class CassandraConfig extends AbstractReactiveCassandraConfiguration {
@Value("${spring.cassandra.contact-points}")
private String contactPoints;
@Value("${spring.cassandra.local-datacenter}")
private String localDatacenter;
@Value("${spring.cassandra.port}")
private int port;
@Value("${spring.cassandra.keyspace-name}")
private String keySpace;
@Value("${spring.cassandra.username}")
private String username;
@Value("${spring.cassandra.password}")
private String password;
@Value("${spring.cassandra.schema-action}")
private SchemaAction schemaAction;
@Override
protected String getContactPoints() {
return contactPoints;
}
@Override
protected int getPort() {
return port;
}
@Override
public SchemaAction getSchemaAction() {
return schemaAction;
}
@Override
protected String getKeyspaceName() {
return keySpace;
}
@Bean
@Override
public CqlSessionFactoryBean cassandraSession() {
CqlSessionFactoryBean cqlSessionFactoryBean = new CqlSessionFactoryBean();
cqlSessionFactoryBean.setUsername(username);
cqlSessionFactoryBean.setPassword(password);
cqlSessionFactoryBean.setKeyspaceName(keySpace);
cqlSessionFactoryBean.setLocalDatacenter(localDatacenter);
cqlSessionFactoryBean.setPort(port);
cqlSessionFactoryBean.setContactPoints(contactPoints);
return cqlSessionFactoryBean;
}
}