我需要我的Flink作业来读取源函数的本地实例,并在单元测试代码本身而不是流中每次源函数实例的数据发生变化时进行更新。
伪码:
StreamExecutionEnvironment env = ...getExecutionEnvironment();
StockSource src = new StockSource(); // the Source Function instance
env.addSource(src);
results = Pipeline(env); // does some calculations and returns the calculated data
env.execute();
// Test 1
When: src.sendData("TWTR", 120.6);
Assert: results.eurRate == 98.87;
// Test 2
When: src.sendData("GOOG", 300);
Assert: results.eurRate == 245.95;
在Flink做这样的事情可能吗?
您可以做的是编写作业,使源和接收器可插入,然后实现合适的源和接收器进行测试。换句话说,类似这样的东西:
public class TestableStreamingJob {
private SourceFunction<Long> source;
private SinkFunction<Long> sink;
public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
this.source = source;
this.sink = sink;
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> LongStream = env.addSource(source).returns(TypeInformation.of(Long.class));
LongStream
.map(new IncrementMapFunction())
.addSink(sink);
env.execute();
}
public static void main(String[] args) throws Exception {
TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
job.execute();
}
}
然后可以这样测试:
public class TestableStreamingJobTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testCompletePipeline() throws Exception {
ParallelSourceFunction<Long> source = new ParallelCollectionSource(Arrays.asList(1L, 10L, -10L));
SinkCollectingLongs sink = new SinkCollectingLongs();
TestableStreamingJob job = new TestableStreamingJob(source, sink);
job.execute();
assertThat(sink.result).containsExactlyInAnyOrder(2L, 11L, -9L);
}
}
用于测试的水槽是这样的:
public class SinkCollectingLongs implements SinkFunction<Long> {
public static final List<Long> result =
Collections.synchronizedList(new ArrayList<>());
public void invoke(Long value, Context context) throws Exception {
result.add(value);
}
}
这个例子是从https://github.com/knaufk/flink-testing-pyramid,您可以查阅以了解更多详细信息。
我实现了自己的自定义源代码,它封装了一个Queue。
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
// this function has to wrap static members because of the way Flink does parrallelism
// SPotbugs doesn't like this
// so we are going to supress the warnings in code that is wrapping this queue
@SuppressFBWarnings
public final class QueueBasedSourceFunction<T>
implements SourceFunction<T>, ResultTypeQueryable<T> {
public static BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1024);
private static boolean running = false;
Class<T> clazz;
public QueueBasedSourceFunction(Class<T> clazz) {
this.clazz = clazz;
}
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
this.running = true;
while (this.running) {
T elem = (T) queue.poll(1, TimeUnit.SECONDS);
if (elem != null) {
sourceContext.collect(elem);
}
}
}
@Override
public void cancel() {
this.running = false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(clazz);
}
public void produce(T s) {
queue.offer(s);
}
public void waitTillConsumed() throws InterruptedException {
synchronized (queue) {
while (!queue.isEmpty()) {
queue.wait(100);
}
}
}
}
此源将读取队列的元素并输出它们。在测试中,您需要对队列进行馈送。像这个
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
QueueBasedSourceFunction<String> sourceFUnc = new QueueBasedSourceFunction(String.class);
DataStreamSource<Record> source = env.addSource(sourceFUnc);
SingleOutputStreamOperator<String> result = source ..... // do whatever you need to do here
result.addSink(sink());
// start a background thread that feeds test data into the queue
// you can add waits to simulate real data coming in
Executors.newSingleThreadExecutor()
.submit(
() -> {
IntStream.range(1, 10)
.forEach(
i -> {
QueueBasedSourceFunction.queue.offer("Foo" + i);
QueueBasedSourceFunction.queue.offer("Bar" + i);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
return;
}
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
return;
}
QueueBasedSourceFunction.queue.offer( "CLose");
// we need to wait for the queue to be empty before stopping the source
// if the source is stopped too early, records won't be processed
synchronized (QueueBasedSourceFunction.queue) {
try {
while (!QueueBasedSourceFunction.queue.isEmpty()) {
QueueBasedSourceFunction.queue.wait(1000);
}
} catch (InterruptedException e) {
return;
}
}
//close the source. Your test won't exit until the source is closed
sourceFUnc.cancel();
});
// execute
env.execute();
}
该测试每2秒生成2条测试记录,持续20秒,等待10秒,生成另一条记录,等待直到所有记录都被消耗掉。您可以实现自己的逻辑