我正在 flink 上做一些 poc,但我找不到有关如何在 kafka 流中实现类似于 KGroupTable 的用例的文档,如下所示
KTable<byte[], Long> aggregatedStream = groupedTable.aggregate(() -> 0L,
(aggKey, newValue, aggValue) -> aggValue + newValue.length(),
(aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), Serdes.Long(), "aggregation-table-store");
用例我想从收到的交易中汇总账户余额。如果我得到现有交易 ID 的更新,我想删除旧值并添加新值。假设如果交易被取消,我想从帐户余额中删除旧值。
例如
TransactionId AccountId Balance
1 account1 1000 // account1 - 1000
2 account1 2000 // account1 - 3000
3 account2 2000 // account1 - 3000, account2 - 2000
1 account1 500 // account1 - 2500, account2 - 2000
在上面的示例中,第 4 次更新是,我得到了现有交易 #1 的更新,因此它将删除旧余额 (1000( 并添加新余额 (500(
谢谢
这是你如何处理这个问题的草图。我使用元组是因为我很懒;最好使用 POJO。
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransactionsWithRetractions {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple3<Integer, String, Float>> rawInput = env.fromElements(
new Tuple3<>(1, "account1", 1000.0F ),
new Tuple3<>(2, "account1", 2000.0F),
new Tuple3<>(3, "account2", 2000.0F),
new Tuple3<>(1, "account1", 500.0F)
);
rawInput
.keyBy(t -> t.f1)
.map(new ManageAccounts())
.print();
env.execute();
}
public static class ManageAccounts extends RichMapFunction<Tuple3<Integer, String, Float>, Tuple2<String, Float>>{
MapStateDescriptor<Integer, Float> transactionsDesc;
ReducingStateDescriptor<Float> balanceDesc;
@Override
public void open(Configuration parameters) throws Exception {
transactionsDesc = new MapStateDescriptor<Integer, Float>("transactions", Integer.class, Float.class);
balanceDesc = new ReducingStateDescriptor<>("balance", (f, g) -> f + g, Float.class);
}
@Override
public Tuple2<String, Float> map(Tuple3<Integer, String, Float> event) throws Exception {
MapState<Integer, Float> transactions = getRuntimeContext().getMapState(transactionsDesc);
ReducingState<Float> balance = getRuntimeContext().getReducingState(balanceDesc);
Float currentValue = transactions.get(event.f0);
if (currentValue == null) {
currentValue = 0F;
}
transactions.put(event.f0, event.f2);
balance.add(event.f2 - currentValue);
return new Tuple2<>(event.f1, balance.get());
}
}
}
运行时,这将产生:
1> (account1,1000.0)
8> (account2,2000.0)
1> (account1,3000.0)
1> (account1,2500.0)
请注意,此实现使所有事务永远处于状态,这在实际应用程序中可能会出现问题,尽管您可以将 Flink 状态扩展到非常大。