它使用处理时间和广播状态运行。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
BroadcastStream<List<TableOperations>> broadcastOperationsState = env
.addSource(new LoadCassandraOperations(10000L, cassandraHost, cassandraPort)).broadcast(descriptor);
SingleOutputStreamOperator<InternalVariableValue> stream =
env.addSource(new SourceMillisInternalVariableValue(5000L));
SingleOutputStreamOperator<InternalVariableOperation> streamProcessed =
stream
.keyBy(InternalVariableValue::getUuid)
.connect(broadcastOperationsState)
.process(new AddOperationInfo())
;
streamProcessed.print();
SourceMillisIntervalVariableValues每5秒创建一个事件。事件存储在一个静态集合中。运行方法看起来像:
public class SourceMillisInternalVariableValue extends RichSourceFunction<InternalVariableValue>{
private boolean running;
long millis;
public SourceMillisInternalVariableValue(long millis) {
super();
this.millis = millis;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
}
@Override
public void cancel() {
running = false;
}
@Override
public void run(SourceContext<InternalVariableValue> ctx) throws Exception {
//Espera inicial
Thread.sleep(1500);
PojoVariableValues[] pojoData =
new PojoVariableValues[]{
new PojoVariableValues("id1", "1"),
new PojoVariableValues("id2", "2"),
....
....
new PojoVariableValues("id21", "21")
};
int cont = 0;
while (cont<pojoData.length) {
System.out.println("Iteration "+cont+" "+pojoData.length);
ctx.collect(generateVar(pojoData[0+cont].getUUID(), pojoData[0+cont].getValue()));
ctx.collect(generateVar(pojoData[1+cont].getUUID(), pojoData[1+cont].getValue()));
ctx.collect(generateVar(pojoData[2+cont].getUUID(), pojoData[2+cont].getValue()));
cont = cont +3;
Thread.sleep(millis);
}
}
private InternalVariableValue generateVar(String uuid, String value)
{
return InternalVariableValueMessage.InternalVariableValue.newBuilder()
.setUuid(uuid)
.setTimestamp(new Date().getTime()).setValue(value).setKeyspace("nest").build();
}
class PojoVariableValues {
private String UUID;
private String Value;
public PojoVariableValues(String uUID, String value) {
super();
UUID = uUID;
Value = value;
}
public String getUUID() {
return UUID;
}
public void setUUID(String uUID) {
UUID = uUID;
}
public String getValue() {
return Value;
}
public void setValue(String value) {
Value = value;
}
}
}
LoadCassandraOperations每10秒发出一次事件。它运行良好。
当我运行此代码时,SourceMillisIntervalVariableValues在第一次迭代中停止,只发出三个事件。如果我对进程函数进行注释,两个源都能正常工作,但如果我运行过程,源将被取消。。。
源发出所有事件(确切地说是21个(,并且所有事件都在聚合函数中进行处理。如果我运行这段代码,源代码中的while循环只完成一次迭代。
知道吗?
谢谢你。欢呼
编辑:
重要。此代码用于探索进程时间和广播功能。我知道我没有使用来源中的最佳实践。感谢
编辑2:当我尝试运行流程函数时,问题就开始了。
已解决!!
问题是,我试图使用TestContainer运行它,但我无法查看任何日志。
我用一个简单的main方法运行了它,我可以看到一些代码错误(比如注释中的注释。Tnks!!(。