我正在处理简单的案例,我们将流 1(仓位(与流 2(价格(连接起来,并将最新的仓位数据与价格数据链接起来。为此,我扩展了一个RichCoFlatMapFunction,它构建了一个包装器对象,该对象从任一流中收集数据。在此过程中,它还将数据存储在其 MapState 中。
在一天结束时,根据另一个流数据(例如日期更改流(,我需要清除状态。我该怎么做?基本上我需要清除价格状态和位置状态。我不确定我们是否可以获得广播流来做到这一点?
加入 2 个流的示例代码如下
static final class PositionPriceWrapperBuilder extends RichCoFlatMapFunction<Position, Price, PositionPriceWrapper> {
private transient MapState<String, Price> priceState;
private transient MapState<String, Position> positionState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Price> descPrice = new MapStateDescriptor<String, Price>(
"priceState",
String.class,
Price.class);
priceState = getRuntimeContext().getMapState(descPrice);
System.out.println("descPrice:: " + descPrice);
//Same thing needs to be done for Price?
MapStateDescriptor<String, Position> descPos = new MapStateDescriptor<String, Position>(
"positionState",
String.class,
Position.class);
positionState = getRuntimeContext().getMapState(descPos);
System.out.println("positionState:: " + positionState);
}
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap1(Position position, Collector<PositionPriceWrapper> out) throws Exception {
try {
//= pnlState.get(position.getId());
Price price = priceState.get(position.getId());
PositionPriceWrapper ppw = new PositionPriceWrapper();
ppw.setPrice(price);
ppw.setPosition(position);
ppw.setAccount(position.getAccount());
ppw.setCusip(position.getCusip());
System.out.println("Built ppw -->" + ppw);
positionState.put(position.getId(), position);
out.collect(ppw);
}
catch ( Exception e) {
e.printStackTrace();
}
}
@Override
public void flatMap2(Price price, Collector<PositionPriceWrapper> out) throws Exception {
try {
Position position = positionState.get(price.getId());
PositionPriceWrapper ppw = new PositionPriceWrapper();
ppw.setPrice(price);
ppw.setPosition(position);
ppw.setAccount(price.getAccount());
ppw.setCusip(price.getCusip());
priceState.put(price.getId(), price);
out.collect(ppw);
}
catch ( Exception e) {
e.printStackTrace();
}
}
}
如果 Flink 提供了一个三输入运算符,你想做什么会很简单,但它没有。 Flink 只支持一个或两个输入的运算符。
一种选择可能是将RichCoFlatMap转换为CoProcessFunction,并使用计时器触发状态清除。或者依靠 StateTTL 机制来清除状态。
如果您确实需要显式触发状态清算,您可以做的是使用 union(( 将价格和头寸流合并到一个DataStream<Either<Price, Position>>
中(或首先将两个流映射到某个统一类型(,然后将该流连接到具有启动状态清算信号的广播流。或者,您可以将所有三个流合并在一起,如果它们都以相同的方式进行键控。