我正在学习Flink,我想构建一个扩展ProcessWindowFunction的运算符函数,并用一个参数重载一个新的构造函数作为类的字段值,但当这个类被实例化时,如果没有这个字段,我会感到困惑。代码如下。
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.PutRecordsResult;
import io.github.streamingwithflink.chapter8.PoJoElecMeterSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class DataHubSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(10_000L);
env.setParallelism(2);
RecordSchemaSer schema = new RecordSchemaSer();
schema.addField(new Field("id", FieldType.STRING));
DataStream<PutRecordsResult> out = env
.addSource(new PoJoElecMeterSource())
.keyBy( r -> r.getId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.process(new PutDatahubFunction<>(schema)); // PutDatahubFunction is my build a new Operator function class
env.execute();
}
}
变量schema是我想发送给构造函数的一个参数,它是RecordSchemaSer类的一个实例
import com.aliyun.datahub.client.model.RecordSchema;
import java.io.Serializable;
public class RecordSchemaSer
extends RecordSchema
implements Serializable {
}
PutDatahubFunction是一个扩展ProcessWindowFunction的类,代码如下
import com.aliyun.datahub.client.model.*;
import io.github.streamingwithflink.chapter8.PUDAPoJo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
public class PutDatahubFunction<IN extends PUDAPoJo, KEY>
extends ProcessWindowFunction<IN, PutRecordsResult, KEY, TimeWindow> {
private DataHubBase dataHubHandler;
private List<RecordEntry> recordEntries;
private RecordSchema schema;
public PutDatahubFunction(RecordSchema schema) {
this.schema = schema;
System.out.println("field 'id' not exist ? " + this.schema.containsField("id")); // it's true
}
@Override
public void open(Configuration parameters) throws Exception {
.........
}
@Override
public void process(KEY KEY,
Context context,
Iterable<IN> elements,
Collector<PutRecordsResult> out)
throws Exception {
RecordEntry entry = new RecordEntry();
for (IN e : elements) {
System.out.println("field 'id' not exist ? " + this.schema.containsField("id")); // it's false
......
}
}
}
构造函数中的第一个system.out this.schema.containsField("id"(为true,但进程中的第二个system.oout方法this.scheme.containsField"id"为false!为什么?我有system.out实例的两个类名,它们都是PutDatahubFunction。
使用ValueState不起作用,因为构造函数没有调用getRuntimeContext((,否则线程"main"java.lang.IollegalStateException中出现异常:运行时上下文尚未初始化。代码如下:
private ValueState<RecordSchema> schema;
public PutTupleDatahubFunction(RecordSchema schema) throws IOException {
ValueStateDescriptor schemaDes =
new ValueStateDescriptor("datahub schema", TypeInformation.of(RecordSchema.class));
/*
* error Exception in thread "main" java.lang.IllegalStateException:
* The runtime context has not been initialized.
*/
this.schema = getRuntimeContext().getState(schemaDes);
this.schema.update(schema);
}
我很模糊,谁能告诉我原因,有什么方法可以把参数传递给这个运算符函数类的构造函数吗?谢谢
我终于弄清楚了原因,原因是序列化和反序列化。我没有编码RecordSchemaSer原因是序列化内容,由于为空
public class RecordSchemaSer
extends RecordSchema
implements Serializable
{
}