尝试在 Apache Flink 中的 keyedBroadCastProcessFunction 中的 process



我有一些有趣的场景,我正在flink中使用keyedbroadcastprocess函数评估传入模式的模式匹配,当我在IDE中运行该程序时,我在尝试访问ReadOnlyContext时在processElements函数中出现空指针异常,但它在终端中运行良好,下面是我的keyedbroadcastprocess函数

以下是我的主要课程

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.json.JSONObject;
public class SignalPatternMatchingApp {
public static final MapStateDescriptor<String, Map<String, String>> patternRuleDescriptor =
new MapStateDescriptor(SignalPatternMatchingConstants.PATTERN_RULE_DESCRIPTOR_NAME,
BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, String.class));

public static final OutputTag<JSONObject> unMatchedSideOutput =
new OutputTag<JSONObject>("sideoutput") {
};

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


DataStream<JSONObject> inputSignal = get input from kafka stream
DataStream<Map<String, String>> rawPatternStream =
env.fromElements(get data from database);
DataStream<Tuple2<String, Map<String, String>>> patternStream =
rawPatternStream.flatMap(new FlatMapFunction<Map<String, String>,
Tuple2<String, Map<String, String>>>() {
@Override
public void flatMap(Map<String, String> patternRules,
Collector<Tuple2<String, Map<String, String>>> out) throws Exception {
for (Map.Entry<String, String> stringEntry : patternRules.entrySet()) {
JSONObject jsonObject = new JSONObject(stringEntry.getValue());
Map<String, String> map = new HashMap<>();
for (String key : jsonObject.keySet()) {
String value = jsonObject.get(key).toString();
map.put(key, value);
}
out.collect(new Tuple2<>(stringEntry.getKey(), map));
}
}
});
BroadcastStream<Tuple2<String, Map<String, String>>> patternBroadcast =
patternStream.broadcast(patternRuleDescriptor);

DataStream<Tuple2<String, JSONObject>> matchedSignal =
inputSignal.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
@Override
public Tuple2<String, JSONObject> map(JSONObject inputSignal) throws Exception {
String sourceName = inputSignal.getJSONObject("signalHeader").get("sourceName").toString();
return new Tuple2<>(sourceName, inputSignal);
}
}).keyBy(0).connect(patternBroadcast).process(new TestProcess());

matchedSignal.print();

DataStream<JSONObject> unmatchedSignal =
((SingleOutputStreamOperator<Tuple2<String, JSONObject>>) matchedSignal)
.getSideOutput(unMatchedSideOutput);
unmatchedSignal.print();
env.execute();
}

KeyedBroadcastProcessFunction as below


public class TestProcess extends KeyedBroadcastProcessFunction<String, Tuple2<String, sampleSignal>,
Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> {
MapStateDescriptor<String, Map<String, String>> patternRuleDesc;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
patternRuleDesc = new MapStateDescriptor<>("RuleDescriptor",
BasicTypeInfo.STRING_TYPE_INFO,new MapTypeInfo<>(String.class,String.class));
}

public static final MapStateDescriptor <String,Map<String,String>> ruleDescriptor =
new MapStateDescriptor <>("RuleDiscriptor",
,BasicTypeInfo.STRING_TYPE_INFO
,new MapTypeInfo<>(String.class,String.class));

@Override
public void processElement(Tuple2<String, sampleSignal> value, ReadOnlyContext ctx, Collector<Tuple2<String,
sampleSignal>> out) throws Exception {

Map<String,String> patternConditions  = ctx.getBroadcastState(this.patternRuleDesc).get(Key);

System.out.println("Before Rule Iterator");

/*I tried below way to print the values in broadcaststream just to print the values
in broadcast state it don't print anything*/

for(Map.Entry<String, String> rules:
patternConditions.entrySet()){
System.out.println("Key: " +rules.getKey());
System.out.println("Value: "+rules.getValue());
}

out.collect(new Tuple2<>(value.f0,value.f1));

}

@Override
public void processBroadcastElement(Tuple2<String, Map<String, String>> value, Context ctx,
Collector<Tuple2<String, sampleSignal>> out) throws Exception {

System.out.println("BroadCastState Key: " +value.f0);
System.out.println("BroadCastState Value: " +value.f1);
ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1);

}
}

下面是出现错误异常的 IDE 终端输出

2020-07-07 12:15:19,349 INFO  [Task] - Ensuring all FileSystem streams are closed for task Flat Map (1/8) (1a117aa5347465fcc2cd5e58c286ccca) [FINISHED]
2020-07-07 12:15:19,348 INFO  [TestProcess ] - BroadCastState SourceName: A
2020-07-07 12:15:19,350 INFO  [TestProcess ] - BroadCastState PatternCondition: PatternRule
2020-07-07 12:15:19,341 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Collection Source (1/1) 23f6a4d19c5744a62c46ac48d4dfbb24.
2020-07-07 12:15:19,351 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,351 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5).
2020-07-07 12:15:19,351 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,351 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3).
2020-07-07 12:15:19,351 INFO  [TestProcess ] - BroadCastState SourceName: A
2020-07-07 12:15:19,351 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) [FINISHED]
2020-07-07 12:15:19,351 INFO  [TestProcess ] - BroadCastState PatternCondition: PatternRule
2020-07-07 12:15:19,351 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) [FINISHED]
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844).
2020-07-07 12:15:19,352 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) [FINISHED]
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd).
2020-07-07 12:15:19,352 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) [FINISHED]
2020-07-07 12:15:19,357 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,357 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b).
2020-07-07 12:15:19,357 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) [FINISHED]
2020-07-07 12:15:19,358 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,358 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9).
2020-07-07 12:15:19,339 INFO  [ExecutionGraph] - Flat Map (3/8) (63a7c3f2b7a8d110232374c700e6378a) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (3/8) d94aea75380e0e32c4747eef2f51a88d.
2020-07-07 12:15:19,358 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4).
2020-07-07 12:15:19,359 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) [FINISHED]
2020-07-07 12:15:19,360 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (2/8) 32f2429b06954884543a4de062edf6f6.
2020-07-07 12:15:19,358 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) [FINISHED]
2020-07-07 12:15:19,360 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (1/8) 7dd6a325fe2265187b0b30bf3b4b8f63.
2020-07-07 12:15:19,358 INFO  [ExecutionGraph] - Flat Map (4/8) (de310509f095e00584ce128336e19adf) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,361 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (5/8) 4920b55f28ea09128aaa4e0d9d4691d8.
2020-07-07 12:15:19,362 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (8/8) 0fded4a7816a9d9a219c520891d2b38d.
2020-07-07 12:15:19,362 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (1/8) 1a117aa5347465fcc2cd5e58c286ccca.
2020-07-07 12:15:19,363 INFO  [ExecutionGraph] - Map (7/8) (ac26eefbff14e55f031a055c1d6fa7f3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,364 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) 67a21177a7598f3f1ccc5001fe6951c3.
2020-07-07 12:15:19,367 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) 773fad58ff1418ae919a0900e4da6ef5.
2020-07-07 12:15:19,367 INFO  [ExecutionGraph] - Flat Map (5/8) (3bb74ec008b43ffe86edd6a7f84844a0) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,368 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) 8d860a734d5b6a50194a5caad135a844.
2020-07-07 12:15:19,368 INFO  [ExecutionGraph] - Map (2/8) (90787449e2373696163da5670b0e543a) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,369 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) 8fa9c71df79d439827a1d290d9ad9abd.
2020-07-07 12:15:19,370 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) ee3c4e895e82f77ad9a055ee788f4a2b.
2020-07-07 12:15:19,373 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) 351cc238fecce28d1dfdc5ac357ef8e4.
2020-07-07 12:15:19,374 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) 6a75da55421a929d4b3d4ebd655414e9.
2020-07-07 12:15:19,369 INFO  [ExecutionGraph] - Flat Map (7/8) (c1ca47240e15ef6c60f1943aed1b45ba) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,379 INFO  [ExecutionGraph] - Source: Collection Source (1/1) (23f6a4d19c5744a62c46ac48d4dfbb24) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,381 INFO  [ExecutionGraph] - Map (3/8) (d94aea75380e0e32c4747eef2f51a88d) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,382 INFO  [ExecutionGraph] - Flat Map (2/8) (32f2429b06954884543a4de062edf6f6) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,384 INFO  [ExecutionGraph] - Map (1/8) (7dd6a325fe2265187b0b30bf3b4b8f63) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,385 INFO  [ExecutionGraph] - Map (5/8) (4920b55f28ea09128aaa4e0d9d4691d8) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - class org.json.JSONObject does not contain a getter for field map
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - class org.json.JSONObject does not contain a setter for field map
2020-07-07 12:15:19,386 INFO  [ExecutionGraph] - Flat Map (8/8) (0fded4a7816a9d9a219c520891d2b38d) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - Class class org.json.JSONObject cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2020-07-07 12:15:19,388 INFO  [ExecutionGraph] - Flat Map (1/8) (1a117aa5347465fcc2cd5e58c286ccca) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,390 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,390 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at com.eventdetection.eventfilter.pattern.operator.json.filter.TestProcess .processElement(TestProcess .java:103)
at com.eventdetection.eventfilter.pattern.operator.json.filter.TestProcess .processElement(TestProcess .java:40)
at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
2020-07-07 12:15:19,394 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc).
2020-07-07 12:15:19,394 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,394 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) [FAILED]
2020-07-07 12:15:19,395 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,396 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,397 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,398 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,399 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,402 INFO  [TaskExecutor] - Un-registering task and sending final execution state FAILED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) 33a27b2d97c96658fd43db24177236bc.
2020-07-07 12:15:19,404 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at com.eventdetection.eventfilter.pattern.operator.json.filter.PatternFilter.processElement(PatternFilter.java:103)
at com.eventdetection.eventfilter.pattern.operator.json.filter.PatternFilter.processElement(PatternFilter.java:40)
at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
2020-07-07 12:15:19,406 INFO  [RestartPipelinedRegionStrategy] - Calculating tasks to restart to recover the failed task d8804397962a5c1c0b4daacb1802fb97_4.
2020-07-07 12:15:19,408 INFO  [RestartPipelinedRegionStrategy] - 26 tasks should be restarted to recover the failed task d8804397962a5c1c0b4daacb1802fb97_4. 
2020-07-07 12:15:19,410 INFO  [ExecutionGraph] - Job Pattern-Matching (1a828d53bc6a886fe0fc7c454e6e66b7) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

请帮助解决我正在使用FLINK 1.10.0版本和INTELLiJ IDE和Java版本1.8

的问题

假设代码的当前状态是以下部分,它实际上唯一可能失败的地方是:

Map<String,String> patternConditions  = ctx.getBroadcastState(this.patternRuleDesc).get(Key);

System.out.println("Before Rule Iterator");

/*I tried below way to print the values in broadcaststream just to print the values
in broadcast state it don't print anything*/

for(Map.Entry<String, String> rules:
patternConditions.entrySet()){
System.out.println("Key: " +rules.getKey());
System.out.println("Value: "+rules.getValue());
}

问题不在于真正获得广播状态,因为它不应该抛出NullPointerException,而是IllegalArgumentException。问题是您得到一些Key,然后尝试对此(.entryset()(执行一些操作。但是,如果给定的Key不存在,则状态将像正常Map一样工作,这意味着它将返回null。如果您尝试对null进行操作,您将获得NullPointerException.

您应该添加一些代码来验证给定Key是否处于该状态,否则它总是会失败。

编辑:

因此,如果问题是关于BroadcastStream和另一个流之间的竞争,那不是您可以轻易阻止的。这是因为消费者消费消息的速度可能不同,通常不可能阻止这种情况。 我可以看到有两种方法您可以尝试解决此问题:

  1. 创建ListState,以保留到达且没有相应Key的元素,然后您只需在给定Key到达时发出它们。注意:您必须从processElement发出它们,因为BroadcastState没有键控。
  2. 使用InputSelectable编写一个首选一个源而不是另一个源的运算符,这样您可能更喜欢广播流而不是元素流。

一般来说,我会说,如果您有连续的数据流,即您知道给定的Key会多次到达,则选项 1 更简单、更快捷。选项 2 提供了更大的灵活性,但也需要更多的知识,并且通常更难正确实施。

相关内容

  • 没有找到相关文章

最新更新