我们可以使用SpringBoot中的Logstash来同步RDBMS中的数据吗



我们可以使用Logstash Jdbc插件将数据从Oracle/任何数据库同步到elastic。但是,我找不到任何方法来操作这个jdbc插件中来自DB的数据。我想在我的春季启动应用程序中使用Logstash/any插件来做同样的事情,我想通过它来操作数据&保存到弹性中之前的列名。

有很多Logstash输入插件,你可以在Logstash内部使用grok过滤器进行基本的流处理,我提供了如何使用Kafka输入插件来进行流处理,并将数据发送到Logstash。

使用Kafka broker创建消费者,并在spring项目中使用publisher类发布文档,然后使用Logstash配置输入将数据吸收到索引中。在这个路线图中,借助Apache Kafka,您拥有强大的消费者-发布者管道。

找到如下示例,

<!--Kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>

在操作您的文档和数据后,创建一个发布者类来发布此文档

public class DriverProducer {
@Autowired
KafkaTemplate<Integer, String > kafkaTemplate;
@Autowired
ObjectMapper objectMapper;

public void messenger(Object convey) throws JsonProcessingException {
String message=objectMapper.writeValueAsString(convey);
ListenableFuture<SendResult<Integer,String>> listenableFuture=kafkaTemplate.sendDefault(null, message);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
failHandler(null, message, throwable);
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
successHandler(result);
}
});
}
private void failHandler(Integer key, String message, Throwable throwable){
//log.error("Unable to send the message for following Error :"+throwable.getMessage());
try {
throw throwable;
}
catch (Throwable anotherThrowable){
//log.error("**Supreme Error on throwing the throwable**"+anotherThrowable.getMessage());
}
}
private void successHandler (SendResult<Integer,String> result){
//log.info("Message sent successfully :"+ result);
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public static class Convey{
private SagaSequence sequence;
private Integer key;
private Date date;
}

你的Logstash配置文件可能看起来像这个

input {
kafka{
group_id => "35834"
topics => ["Second-Topic"]
bootstrap_servers => "localhost:9092"
codec => json
}
}
filter {
}
output {
file {
path => "/SOMEPATH"
}
elasticsearch {
hosts => ["localhost:9200"]
document_type => "_doc"
index => "logger"
}
stdout { codec => rubydebug
}
}

最新更新