Flink Streaming RichSource提前停止



它使用处理时间和广播状态运行。


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

BroadcastStream<List<TableOperations>> broadcastOperationsState = env
.addSource(new LoadCassandraOperations(10000L, cassandraHost, cassandraPort)).broadcast(descriptor);
SingleOutputStreamOperator<InternalVariableValue> stream = 
env.addSource(new SourceMillisInternalVariableValue(5000L));

SingleOutputStreamOperator<InternalVariableOperation> streamProcessed = 
stream
.keyBy(InternalVariableValue::getUuid)
.connect(broadcastOperationsState)
.process(new AddOperationInfo())
;

streamProcessed.print();

SourceMillisIntervalVariableValues每5秒创建一个事件。事件存储在一个静态集合中。运行方法看起来像:


public class SourceMillisInternalVariableValue extends RichSourceFunction<InternalVariableValue>{

private boolean running;

long millis;
public SourceMillisInternalVariableValue(long millis) {
super();
this.millis = millis;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
}
@Override
public void cancel() {
running = false;
}
@Override
public void run(SourceContext<InternalVariableValue> ctx) throws Exception {        
//Espera inicial
Thread.sleep(1500);
PojoVariableValues[] pojoData =
new PojoVariableValues[]{
new PojoVariableValues("id1", "1"),
new PojoVariableValues("id2", "2"),
....
....
new PojoVariableValues("id21", "21")
};

int cont = 0;
while (cont<pojoData.length) {
System.out.println("Iteration "+cont+" "+pojoData.length);
ctx.collect(generateVar(pojoData[0+cont].getUUID(), pojoData[0+cont].getValue()));      
ctx.collect(generateVar(pojoData[1+cont].getUUID(), pojoData[1+cont].getValue()));      
ctx.collect(generateVar(pojoData[2+cont].getUUID(), pojoData[2+cont].getValue()));      
cont = cont +3;
Thread.sleep(millis);       
}

}

private InternalVariableValue generateVar(String uuid, String value)
{
return InternalVariableValueMessage.InternalVariableValue.newBuilder()
.setUuid(uuid)
.setTimestamp(new Date().getTime()).setValue(value).setKeyspace("nest").build();
}

class PojoVariableValues {
private String UUID;
private String Value;

public PojoVariableValues(String uUID, String value) {
super();
UUID = uUID;
Value = value;
}

public String getUUID() {
return UUID;
}
public void setUUID(String uUID) {
UUID = uUID;
}
public String getValue() {
return Value;
}
public void setValue(String value) {
Value = value;
}

}
}

LoadCassandraOperations每10秒发出一次事件。它运行良好。

当我运行此代码时,SourceMillisIntervalVariableValues在第一次迭代中停止,只发出三个事件。如果我对进程函数进行注释,两个源都能正常工作,但如果我运行过程,源将被取消。。。

源发出所有事件(确切地说是21个(,并且所有事件都在聚合函数中进行处理。如果我运行这段代码,源代码中的while循环只完成一次迭代。

知道吗?

谢谢你。欢呼

编辑:

重要。此代码用于探索进程时间和广播功能。我知道我没有使用来源中的最佳实践。感谢

编辑2:当我尝试运行流程函数时,问题就开始了。

已解决!!

问题是,我试图使用TestContainer运行它,但我无法查看任何日志。

我用一个简单的main方法运行了它,我可以看到一些代码错误(比如注释中的注释。Tnks!!(。

最新更新