有没有一种方法可以通过spring-boot访问通过KSQL(kafka)创建的表



我是卡夫卡宇宙的新手,我真的被困在这里了。因此,我们将非常感谢您的帮助。

我已经用kafka流创建了一个表,使用下面的KSQL语句:

CREATE TABLE calc AS 
SELECT id, datetime, count(*) 
FROM streamA 
GROUP BY id, datetime 
HAVING count(*) = total;

其中";streamA">是由"strong>"创建的流;topicA">

我目前正在使用:

  • Java 8
  • Spring Boot v2.2.9

我的pom.xml看起来像:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Packaging -->
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<properties>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<!-- Versioning -->
<groupId>some.name</groupId>
<artifactId>kafka.project</artifactId>
<version>2020.2.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.9.RELEASE</version>
<relativePath />
</parent>
<!-- Meta-data -->
<name>[${project.artifactId}]</name>
<description>Kafka Project</description>
<!-- Dependencies -->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Build settings -->
<build>
<!-- Plugins -->
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

所以,有两个问题:

  1. 是否有任何方法可以通过Kafka Streams API访问该表
  2. 我可以通过我的应用程序而不是KSQL来做类似的事情吗(例如创建那个表(

提前感谢

更新感谢您的建议Shrey Jakhmola(从春季启动应用程序运行KSQL的方法是什么(,但我有一个大数据集,需要定期访问。我认为这种解决方案并不理想。

@Joshua Oliphant,是的,这个表是由一个主题创建的流生成的。

  1. 是否有任何方法可以通过Kafka Streams API访问该表

calc将由名为CALC的变更日志主题支持。如果需要,您可以在应用程序中自由使用此主题。使用标准使用者或Kafka Streams。

但是,如果您只想查询表的当前状态,那么您可以使用ksqlDB的pull查询来完成此操作。这些允许您从ksqlDB正在构建的表中收回行。该功能是基本的,因为它不是ksqlDB提供的核心streamingSQL的一部分,但符合一些用例。

如果你需要除此之外的东西,那么还有其他选择:

  1. 您可以将结果输入到您选择的更传统的sql系统中,例如postgres,然后查询它。(您可以使用ksql的CREATE SINK CONNECTOR将数据导出到postgres(
  2. 您可以使用标准的Kafka客户端在自己的应用程序中使用数据。(尽管只有当你的应用程序的每个实例都能保存表中的所有数据时,这才有效(
  3. 您可以在应用程序中使用Kafka Streams来消费该表。这样做的好处是,应用程序的多个实例可以聚集在一起,这样每个实例只消耗表的一部分数据。然后,您可能需要使用Kafka Streams交互查询来访问表的当前状态。操作将加载
  • 我可以通过我的应用程序而不是KSQL做类似的事情吗(例如创建那个表(
  • 如果你想把ksqlDB从等式中去掉,那么是的,ksqlDB在内部使用KAfka流,所以你可以用ksqlDB做任何事情,你也可以直接用KAfka流做。

    类似SQL:

    CREATE TABLE calc AS 
    SELECT id, datetime, count(*) 
    FROM streamA 
    GROUP BY id, datetime 
    HAVING count(*) = total;
    

    会映射到类似(粗略代码(的东西:

    StreamsBuilder builder = new StreamsBuilder();
    builder
    .stream("streamA", Consumed.with(<appropriate serde>))
    .groupBy(<a mapper that returns id and datetime as new key>)
    .count()
    .filter(<filter>);
    .toStream()
    .to("CALC");
    new KafkaStreams(builder.build(), props, clients).start();