我试图用Spring Integration实现以下工作流程:
- 1) 轮询REST API
- 2) 将POJO存储在Cassandra集群中
这是我第一次尝试使用Spring Integration,所以我仍然对参考资料中的大量信息感到有点不知所措。经过一番研究,我可以完成以下工作。
- 1) 轮询REST API
- 2) 将映射的POJO JSON结果转换为字符串
- 3) 将字符串保存到文件
这是代码:
@Configuration
public class ConsulIntegrationConfig {
@InboundChannelAdapter(value = "consulHttp", poller = @Poller(maxMessagesPerPoll = "1", fixedDelay = "1000"))
public String consulAgentPoller() {
return "";
}
@Bean
public MessageChannel consulHttp() {
return MessageChannels.direct("consulHttp").get();
}
@Bean
@ServiceActivator(inputChannel = "consulHttp")
MessageHandler consulAgentHandler() {
final HttpRequestExecutingMessageHandler handler =
new HttpRequestExecutingMessageHandler("http://localhost:8500/v1/agent/self");
handler.setExpectedResponseType(AgentSelfResult.class);
handler.setOutputChannelName("consulAgentSelfChannel");
LOG.info("Created bean'consulAgentHandler'");
return handler;
}
@Bean
public MessageChannel consulAgentSelfChannel() {
return MessageChannels.direct("consulAgentSelfChannel").get();
}
@Bean
public MessageChannel consulAgentSelfFileChannel() {
return MessageChannels.direct("consulAgentSelfFileChannel").get();
}
@Bean
@ServiceActivator(inputChannel = "consulAgentSelfFileChannel")
MessageHandler consulAgentFileHandler() {
final Expression directoryExpression = new SpelExpressionParser().parseExpression("'./'");
final FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
handler.setFileNameGenerator(message -> "../../agent_self.txt");
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setCharset("UTF-8");
handler.setExpectReply(false);
return handler;
}
}
@Component
public final class ConsulAgentTransformer {
@Transformer(inputChannel = "consulAgentSelfChannel", outputChannel = "consulAgentSelfFileChannel")
public String transform(final AgentSelfResult json) throws IOException {
final String result = new StringBuilder(json.toString()).append("n").toString();
return result;
}
这很好用!
但现在,我不想将对象写入文件,而是想将其存储在带有spring数据Cassandra的Cassandra集群中。为此,我注释掉了配置文件中的文件处理程序,在transformer中返回POJO,并创建了以下内容:
@MessagingGateway(name = "consulCassandraGateway", defaultRequestChannel = "consulAgentSelfFileChannel")
public interface CassandraStorageService {
@Gateway(requestChannel="consulAgentSelfFileChannel")
void store(AgentSelfResult agentSelfResult);
}
@Component
public final class CassandraStorageServiceImpl implements CassandraStorageService {
@Override
public void store(AgentSelfResult agentSelfResult) {
//use spring-data-cassandra repository to store
LOG.info("Received 'AgentSelfResult': {} in Cassandra cluster...");
LOG.info("Trying to store 'AgentSelfResult' in Cassandra cluster...");
}
}
但这似乎是一种错误的方法,服务方法从未被触发。
所以我的问题是,对于我的用例,什么是正确的方法?我是否必须在服务组件中实现MessageHandler接口,并在配置中使用@ServiceActivator。还是我目前的"网关方法"中缺少了什么??或者可能还有另一种解决方案,我看不到。。
就像前面提到的,我是SI的新手,所以这可能是一个愚蠢的问题。。。
尽管如此,还是要提前感谢!
目前尚不清楚如何在CassandraStorageService
bean中布线。
Spring Integration Cassandra扩展项目有一个消息处理程序实现。
春季云流模块中的Cassandra Sink将其与Java配置一起使用,因此您可以将其用作示例。
所以我终于成功了。我所需要做的就是
@Component
public final class CassandraStorageServiceImpl implements CassandraStorageService {
@ServiceActivator(inputChannel="consulAgentSelfFileChannel")
@Override
public void store(AgentSelfResult agentSelfResult) {
//use spring-data-cassandra repository to store
LOG.info("Received 'AgentSelfResult': {}...");
LOG.info("Trying to store 'AgentSelfResult' in Cassandra cluster...");
}
}
CassandraMessageHandler和spring云流似乎是我用例的一大开销,我还没有真正理解。。。通过这个解决方案,我可以控制弹簧组件中发生的事情。