函数在生产中成功执行,但在Flink中的测试中未成功执行



我在Flink 1.12.3中编写了一个集成测试,它测试StreamingJob类中的execute方法。令人惊讶的是,该方法在生产环境中成功地输出了要下沉的记录,但在本地测试中却没有输出任何内容。如何解决此问题并启用测试?

这可能与有关

private static final DeviceIdSink deviceIdSink = new DeviceIdSink();

@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(2)
.build());

@Test
public void testingAStreamingJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

List<JsonNode> events = getListFromResource("events.json");
DataStream<JsonNode> testStream = env.fromCollection(events);
StreamingJob job = new StreamingJob(env, Time.seconds(60),
testStream, deviceIdSink);

job.execute();
System.out.println(deviceIdSink.values);

```

一旦testStream源耗尽,作业将终止。因此,如果你有任何基于时间的窗口,你将有永远不会发出的未决结果。

我使用MockSource,它在调用cancel()方法之前不会终止,例如

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A very simple, non-parallel source based on a list of elements. You can specify a delay
* for between each element that is emitted.
* 
* @param <T>
*/
@SuppressWarnings("serial")
public class MockSource<T> implements SourceFunction<T>, ResultTypeQueryable<T>, Serializable {

private static final Logger LOGGER = LoggerFactory.getLogger(MockSource.class);

private int listSize;
private byte[] elementsSerialized;
private TypeInformation<T> typeInfo;
private TypeSerializer<T> serializer;
private Time delay = null;

private transient volatile boolean running;
// Constructor for cases where you want an empty list as the source.
public MockSource(TypeInformation<T> typeInfo) throws IOException {
this(Collections.emptyList(), typeInfo);
}
@SuppressWarnings("unchecked")
public MockSource(T... elements) throws IOException {
this((List<T>) Arrays.asList(elements));
}
/**
* Create a source from <data>, which cannot be empty (if so, use the other constructor that takes a typeInfo
* argument.
* 
* @param data
* @throws IOException
*/
public MockSource(List<T> data) throws IOException {
this(data, TypeExtractor.getForObject(data.get(0)));
}
public MockSource(List<T> data, TypeInformation<T> typeInfo) throws IOException {
this.typeInfo = typeInfo;
this.serializer = typeInfo.createSerializer(new ExecutionConfig());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
listSize = 0;
try {
for (T element : data) {
serializer.serialize(element, wrapper);
listSize++;
}
} catch (Exception e) {
throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
}
this.elementsSerialized = baos.toByteArray();
}
public MockSource<T> setDelay(Time delay) {
this.delay = delay;
return this;
}

@Override
public void run(SourceContext<T> ctx) throws Exception {
running = true;
Object lock = ctx.getCheckpointLock();
ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
final DataInputView input = new DataInputViewStreamWrapper(bais);
int i = 0;
while (running && (i < this.listSize)) {
T next;
try {
next = serializer.deserialize(input);
} catch (Exception e) {
throw new IOException("Failed to deserialize an element from the source. "
+ "If you are using user-defined serialization (Value and Writable types), check the "
+ "serialization functions.nSerializer is " + serializer, e);
}
synchronized (lock) {
ctx.collect(next);
i++;

if (delay != null) {
LOGGER.debug("MockSource delaying for {}ms", delay.toMilliseconds());

Thread.sleep(delay.toMilliseconds());
}
}
}
}
@Override
public void cancel() {
running = false;
}
@Override
public TypeInformation<T> getProducedType() {
return typeInfo;
}
}

相关内容

  • 没有找到相关文章

最新更新