Flink预洗牌聚合不工作



我正在尝试在flink中进行预洗牌聚合。下面是MapBundle的实现:

public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {
@Override
public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
if (value == null) {
return input;
}
value.tip = value.tip + input.tip;
return value;
}
@Override
public void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) throws Exception {
for (Map.Entry<Long, TaxiFare> entry : buffer.entrySet()) {
out.collect(entry.getValue());
}
}
}

我正在使用"CountBundleTrigger.java"。但是洗牌前的聚合并不像计数那样起作用。变量总是0。如果我错过了什么,请让我知道。

@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}

下面是主代码:

MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new KeySelector<TaxiFare, Long>() {
@Override
public Long getKey(TaxiFare value) throws Exception {
return value.driverId;
}
};
DataStream<Tuple3<Long, Long, Float>> hourlyTips =
//                            fares.keyBy((TaxiFare fare) -> fare.driverId)
//                                             
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());;
fares.transform("preshuffle", TypeInformation.of(TaxiFare.class),
new TaxiFareStream(mapBundleFunction, bundleTrigger, 
taxiFareLongKeySelector
))
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
@Override
public long extractTimestamp(TaxiFare element) {
return element.startTime.getEpochSecond();
}
})
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new AddTips());
DataStream<Tuple3<Long, Long, Float>> hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);

下面是TaxiFareStream.java的代码。

public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, TaxiFare, TaxiFare> {
private KeySelector<TaxiFare, Long> keySelector;
public TaxiFareStream(MapBundleFunction<Long, TaxiFare, 
TaxiFare, TaxiFare> userFunction,
BundleTrigger<TaxiFare> bundleTrigger,
KeySelector<TaxiFare, Long> keySelector) {
super(userFunction, bundleTrigger, keySelector);
this.keySelector = keySelector;
}
@Override
protected Long getKey(TaxiFare input) throws Exception {
return keySelector.getKey(input);
}
}

更新我已经创建了下面的类,但我看到一个错误。我认为它不能序列化类MapStreamBundleOperator.java

public class MapStreamBundleOperator<K, V, IN, OUT> extends
AbstractMapStreamBundleOperator<K, V, IN, OUT> {
private static final long serialVersionUID = 6556268125924098320L;
/** KeySelector is used to extract key for bundle map. */
private final KeySelector<IN, K> keySelector;
public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
KeySelector<IN, K> keySelector) {
super(function, bundleTrigger);
this.keySelector = keySelector;
}
@Override
protected K getKey(IN input) throws Exception {
return this.keySelector.getKey(input);
}
}

2021-08-27 05:06:04,814 ERROR FlinkDefaults.class                                           - Stream execution failed
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:247)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:497)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:318)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:264)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:173)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at com.pinterest.xenon.flink.FlinkDefaults$.run(FlinkDefaults.scala:46)
at com.pinterest.xenon.flink.FlinkWorkflow.run(FlinkWorkflow.scala:74)
at com.pinterest.xenon.flink.WorkflowLauncher$.executeWorkflow(WorkflowLauncher.scala:43)
at com.pinterest.xenon.flink.WorkflowLauncher$.delayedEndpoint$com$pinterest$xenon$flink$WorkflowLauncher$1(WorkflowLauncher.scala:25)
at com.pinterest.xenon.flink.WorkflowLauncher$delayedInit$body.apply(WorkflowLauncher.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.pinterest.xenon.flink.WorkflowLauncher$.main(WorkflowLauncher.scala:9)
at com.pinterest.xenon.flink.WorkflowLauncher.main(WorkflowLauncher.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:168)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.io.NotSerializableException: visibility.mabs.src.main.java.com.pinterest.mabs.MabsFlinkJob
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

我不会依赖官方的MapBundleOperator,因为David已经说过这不是很好的文档。我会根据自己的AbstractMapStreamBundleOperator来回答这个问题。我认为你遗漏了processElement()方法中的计数器numOfElements++;。使用泛型也更好。使用以下代码:

public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
private static final long serialVersionUID = 1L;
private final Map<K, V> bundle;
private final BundleTrigger<IN> bundleTrigger;
private transient TimestampedCollector<OUT> collector;
private transient int numOfElements = 0;
public AbstractMapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger) {
super(function);
chainingStrategy = ChainingStrategy.ALWAYS;
this.bundle = new HashMap<>();
this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger is null");
}
@Override
public void open() throws Exception {
super.open();
numOfElements = 0;
collector = new TimestampedCollector<>(output);
bundleTrigger.registerCallback(this);
// reset trigger
bundleTrigger.reset();
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// get the key and value for the map bundle
final IN input = element.getValue();
final K bundleKey = getKey(input);
final V bundleValue = this.bundle.get(bundleKey);
// get a new value after adding this element to bundle
final V newBundleValue = userFunction.addInput(bundleValue, input);
// update to map bundle
bundle.put(bundleKey, newBundleValue);
numOfElements++;
bundleTrigger.onElement(input);
}
protected abstract K getKey(final IN input) throws Exception;
@Override
public void finishBundle() throws Exception {
if (!bundle.isEmpty()) {
numOfElements = 0;
userFunction.finishBundle(bundle, collector);
bundle.clear();
}
bundleTrigger.reset();
}
}

然后创建MapStreamBundleOperator,就像你已经有。使用以下代码:

public class MapStreamBundleOperator<K, V, IN, OUT> extends AbstractMapStreamBundleOperator<K, V, IN, OUT> {
private final KeySelector<IN, K> keySelector;
public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
KeySelector<IN, K> keySelector) {
super(function, bundleTrigger);
this.keySelector = keySelector;
}
@Override
protected K getKey(IN input) throws Exception {
return this.keySelector.getKey(input);
}
}

触发器内部的计数器使Bundle操作符将事件刷新到下一阶段。CountBundleTrigger如下所示。使用这个代码。你还需要BundleTriggerCallback。

public class CountBundleTrigger<T> implements BundleTrigger<T> {
private final long maxCount;
private transient BundleTriggerCallback callback;
private transient long count = 0;
public CountBundleTrigger(long maxCount) {
Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
this.maxCount = maxCount;
}
@Override
public void registerCallback(BundleTriggerCallback callback) {
this.callback = Preconditions.checkNotNull(callback, "callback is null");
}
@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}
@Override
public void reset() { count = 0; }
@Override
public String explain() {
return "CountBundleTrigger with size " + maxCount;
}
}

则必须创建一个这样的触发器来传递操作符。在这里,我创建了一个100TaxiFare事件包。以另一个POJO为例。我在这里写了MapBundleTaxiFareImpl,但是你可以基于这个创建你的UDF。

private OneInputStreamOperator<Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> getPreAggOperator() {
MapBundleFunction<Long, TaxiFare, Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> myMapBundleFunction = new MapBundleTaxiFareImpl();
CountBundleTrigger<Tuple2<Long, TaxiFare>> bundleTrigger = new CountBundleTrigger<Tuple2<Long, TaxiFare>>(100);
return new MapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector);
}

最后使用transform()在某处调用这个新操作符。以另一个POJO为例。

stream
...
.transform("my-pre-agg", 
TypeInformation.of(new TypeHint<Tuple2<Long, TaxiFare>>(){}), getPreAggOperator())
...
我知道这就是你所需要的。尝试使用这些类,如果它是缺少的东西,它可能是在giitrepository,我把链接。我希望你能成功。

相关内容

  • 没有找到相关文章

最新更新