如何在flink中测试keyedbroadcastprocess功能



我是flink的新手,我正在尝试编写junit测试用例来测试KeyedBroadCastProcessFunction。下面是我的代码,我目前正在TestUtils类中调用getDataStreamOutput方法,并在根据模式规则列表评估输入数据后将inputdata和patternrules传递给该方法,如果输入数据满足条件,我将获得信号并调用sink函数,并在getDataStreamOutput方法中以字符串形式返回输出数据

@Test
public void testCompareInputAndOutputDataForInputSignal() throws Exception {
Assertions.assertEquals(sampleInputSignal,
TestUtils.getDataStreamOutput(
inputSignal,
patternRules));
}

public static String getDataStreamOutput(JSONObject input, Map<String, String> patternRules) throws Exception {
env.setParallelism(1);
DataStream<JSONObject> inputSignal = env.fromElements(input);
DataStream<Map<String, String>> rawPatternStream =
env.fromElements(patternRules);
//Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
DataStream<Tuple2<String, Map<String, String>>> patternRuleStream =
rawPatternStream.flatMap(new FlatMapFunction<Map<String, String>,
Tuple2<String, Map<String, String>>>() {
@Override
public void flatMap(Map<String, String> patternRules,
Collector<Tuple2<String, Map<String, String>>> out) throws Exception {
for (Map.Entry<String, String> stringEntry : patternRules.entrySet()) {
JSONObject jsonObject = new JSONObject(stringEntry.getValue());
Map<String, String> map = new HashMap<>();
for (String key : jsonObject.keySet()) {
String value = jsonObject.get(key).toString();
map.put(key, value);
}
out.collect(new Tuple2<>(stringEntry.getKey(), map));
}
}
});
BroadcastStream<Tuple2<String, Map<String, String>>> patternRuleBroadcast =
patternStream.broadcast(patternRuleDescriptor);

DataStream<Tuple2<String, JSONObject>> validSignal = inputSignal.map(new MapFunction<JSONObject,
Tuple2<String, JSONObject>>() {
@Override
public Tuple2<String, JSONObject> map(JSONObject inputSignal) throws Exception {
String source =
inputSignal.getSource();
return new Tuple2<>(source, inputSignal);
}
}).keyBy(0).connect(patternRuleBroadcast).process(new MyKeyedBroadCastProcessFunction());


validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
JSONObject>() {
@Override
public JSONObject map(Tuple2<String, JSONObject> inputSignal) throws Exception {
return inputSignal.f1;
}
}).addSink(new getDataStreamOutput());
env.execute("TestFlink");
}
return (getDataStreamOutput.dataStreamOutput);
}

@SuppressWarnings("serial")
public static final class getDataStreamOutput implements SinkFunction<JSONObject> {
public static String dataStreamOutput;
public void invoke(JSONObject inputSignal) throws Exception {
dataStreamOutput = inputSignal.toString();
}
}

我需要用相同的广播规则测试不同的输入,但每次我调用这个函数时,它从一开始就一次又一次地获取输入信号广播数据,有没有一种方法我可以广播一次并继续向我探索的方法发送输入?我可以使用CoFlatMapFunction(如下所示(来组合数据流,并在方法运行时继续发送输入规则,但对于这个数据流,它必须再次从kafka主题中获取数据,这将使加载kafka utils和服务器的方法负担过重

DataStream<JSONObject> inputSignalFromKafka = env.addSource(inputSignalKafka);
DataStream<org.json.JSONObject> inputSignalFromMethod = env.fromElements(inputSignal));

DataStream<JSONObject> inputSignal = inputSignalFromMethod.connect(inputSignalFromKafka)
.flatMap(new SignalCoFlatMapper());

public static class SignalCoFlatMapper
implements CoFlatMapFunction<JSONObject, JSONObject, JSONObject> {
@Override
public void flatMap1(JSONObject inputValue, Collector<JSONObject> out) throws Exception {
out.collect(inputValue);
}
@Override
public void flatMap2(JSONObject kafkaValue, Collector<JSONObject> out) throws Exception {
out.collect(kafkaValue);
}
}

我在stackoverflow中找到了一个链接,当processElement依赖于广播数据时,如何在flink中对BroadcastProcessFunction进行单元测试,但这让我很困惑

无论如何,我只能在测试用例中的Before方法中广播一次,并不断向我的广播功能发送不同类型的数据

您可以使用KeyedTwoInputStreamOperatorTestHarness来实现这一点。例如,假设您有以下KeyedBroadcastProcessFunction,其中您为两个DataStream通道定义了一些业务逻辑

public class SimpleKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, String, String, String> {
@Override
public void processElement(String inputEntry,
ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
//business logic for how you want to process your data stream records
}
@Override
public void processBroadcastElement(String broadcastInput, Context
context, Collector<String> collector) throws Exception {
//process input from your broadcast channel
}

现在假设您的流程函数是有状态的,并且正在对Flink内部状态进行修改,您必须在测试类中创建一个TestHarness,以确保能够在测试期间跟踪状态。

然后,我将使用以下方法创建一些单元测试:

public class SimpleKeyedBroadcastProcessFunctionTest {
private SimpleKeyedBroadcastProcessFunction processFunction;
private KeyedTwoInputStreamOperatorTestHarness<String, String, String, String> testHarness;
@Before
public void setup() throws Exception {
processFunction =  new SimpleKeyedBroadcastProcessFunction();
testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
new CoBroadcastWithKeyedOperator<>(processFunction, ImmutableList.of(BROADCAST_MAP_STATE_DESCRIPTOR)),
(KeySelector<String, String>) string -> string ,
(KeySelector<String, String>) string -> string,
TypeInformation.of(String.class));
testHarness.setup();
testHarness.open();
}
@After
public void cleanup() throws Exception {
testHarness.close();
}
@Test
public void testProcessRegularInput() throws Exception {
//processElement1 send elements into your regular stream, second param will be the event time of the record
testHarness.processElement1(new StreamRecord<>("Hello", 0));
//Access records collected during processElement  
List<StreamRecord<? extends String>> records = testHarness.extractOutputStreamRecords();
assertEquals("Hello", records.get(0).getValue())
}
@Test
public void testProcessBroadcastInput() throws Exception {
//processElement2 send elements into your broadcast stream, second param will be the event time of the record
testHarness.processElement2(new StreamRecord<>("Hello from Broadcast", 0));
//Access records collected during processElement  
List<StreamRecord<? extends String>> records = testHarness.extractOutputStreamRecords();
assertEquals("Hello from Broadcast", records.get(0).getValue())
}
}

相关内容

  • 没有找到相关文章

最新更新