我有一个基于 Flink Java API 的类:
public class SP implements Serializable {
private transient StreamExecutionEnvironment env;
private DataStream<byte[]> data ;
}
然后,我尝试为类SP
编写一个获取函数名称并将该函数应用于data
字段行的方法。
public DataStream<Object> myMap(Function<Object, Object> func) {
return data.map(x -> func.apply(x));
}
因此,在main方法中,我创建了一个简单的函数并将其传递给myMap
函数。
public static void main(String[] args) throws Exception {
SP temp = new SP();
DataStream<Object> datastream = temp.getDataFromKakfa("7798", 1).myMap(Test::print) ;
datastream.print() ;
temp.execute();
}
public static Object print(Object o) {
try {
StringBuilder res = new StringBuilder();
for (byte b : serializeObject(o)) {
res.append(String.format("%02X ", b));
res.append(" "); // delimiter
}
return res.toString();
} catch (NullPointerException e){
return 0 ;
} catch (IOException e) {
return 0;
}
}
public static byte[] serializeObject(Object obj) throws IOException
{
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bytesOut);
oos.writeObject(obj);
oos.flush();
byte[] bytes = bytesOut.toByteArray();
bytesOut.close();
oos.close();
return bytes;
}
但是我得到了错误:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields.
它指的是myMap函数。如何解决问题?这是做这种情况的更直接的方法吗?
没有太多细节就不看这个,看来你的Function<Object, Object> func
需要实现Serializable
。
您可以创建标记接口:
@FunctionalInterface
interface SerializableFuncton<I, O> extends Function<I, O>, Serializable { }
然后将DataStream<Object> myMap(Function<Object, Object> func)
更改为DataStream<Object> myMap(SerializableFuncton<Object, Object> func)
.