正确的方法如何将上下文从外部源到Kafka流中的记录



我有使用Kafka流(使用处理器API(处理的记录。假设记录中有city_id和其他一些字段。

在Kafka Streams应用程序中,我想将目标城市中的当前温度添加到记录中。
Temperature<->City对存储在Eg中。Postgres。

在Java应用程序中,我可以使用JDBC连接到Postgres并构建new HashMap<CityId, Temperature>,因此我可以根据city_id查找温度。类似tempHM.get(record.city_id)

有几个问题如何最好地处理:

在哪里启动上下文数据?

最初,我一直在AbstractProcessor::init()中进行此操作,但这似乎是错误的,因为它是针对每个线程初始化的,并且还重新定位了重新平衡。

因此,我将其移动到流式拓扑构建器和处理器随之构建之前。数据仅在所有处理器实例上独立获取一次。

是正确且有效的方法。它有效,但是...

HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;
// Connect to DB and initialize tempHM here
Topology topology = new Topology();
topology
    .addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")
    .addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)
    .addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;

如何刷新上下文数据?

例如,我想每15分钟刷新温度数据。我正在考虑使用hashmap容器而不是hashmap,这可以处理:

abstract class ContextContainer<T> {
    T context;
    Date lastRefreshAt;
    ContextContainer(Date now) {
        refresh(now);
    }
    abstract void refresh(Date now);
    abstract Duration getRefreshInterval();
    T get() {
        return context;
    }
    boolean isDueToRefresh(Date now) {
        return lastRefreshAt == null
            || lastRefreshAt.getTime() + getRefreshInterval().toMillis() < now.getTime();
    }
}
final class CityTemperatureContextContainer extends ContextContainer<HashMap> {
    CityTemperatureContextContainer(Date now) {
        super(now);
    }
    void refresh(Date now) {
        if (!isDueToRefresh(now)) {
            return;
        }
        HashMap context = new HashMap();
        // Connect to DB and get data and fill hashmap
        lastRefreshAt = now;
        this.context = context;
    }
    Duration getRefreshInterval() {
        return Duration.ofMinutes(15);
    }
}

这是一个简短的概念,用so textarea编写,可能包含一些语法错误,但重点很明显,我希望

然后将其像.addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(cityTemperatureContextContainer), SOURCE)

一样传递到处理器

和处理器do

    public void init(final ProcessorContext context) {
        context.schedule(
            Duration.ofMinutes(1),
            PunctuationType.STREAM_TIME,
            (timestamp) -> { 
                cityTemperatureContextContainer.refresh(new Date(timestamp));
                tempHm = cityTemperatureContextContainer.get();
            }    
        );
        super.init(context);
    }

有更好的方法吗?主要的问题是找到适当的概念,我可以实施它。不过,关于该主题的资源不多。

在Kafka Streams应用程序中,我想将目标城市中的当前温度添加到记录中。Temperature<->City对存储在Eg中。Postgres。

在Java应用程序中,我可以使用JDBC连接到Postgres并构建新的HashMap<CityId, Temperature>,因此我可以根据city_id查找温度。像tempHM.get(record.city_id)一样。

更好的选择是使用Kafka Connect将Postgres的数据摄入Kafka主题,将此主题阅读到使用Kafka流中的应用程序中的KTable中,然后与您的其他流(Records的Records流一起加入此KTable("使用city_id和其他一些字段"(。也就是说,您将执行KStream -TO- KTable加入。

思考:

### Architecture view
DB (here: Postgres) --Kafka Connect--> Kafka --> Kafka Streams Application

### Data view
Postgres Table ----------------------> Topic --> KTable

用例的示例连接器是https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc and https://wwwww.confluent.io/hhub/hub/debezium/debezium/debezium/debezium-connector-postgrostgresgresqul-.-

上面基于Kafka Connect设置的优点之一是,您不再需要直接从Java应用程序(使用Kafka流(与Postgres DB进行交谈。

另一个优点是,您不需要从DB中进行上下文数据的"批次刷新"(每15分钟(到您的Java应用程序中,因为该应用程序将在实时实时进行最新的DB更改自动通过db-> kconnect-> kafka-> kStreams-app流。

相关内容

  • 没有找到相关文章

最新更新