如何将函数名称作为 Flink Map 函数的输入传递



我有一个基于 Flink Java API 的类:

public class SP implements Serializable {
private transient StreamExecutionEnvironment env;
private DataStream<byte[]> data ;
}

然后,我尝试为类SP编写一个获取函数名称并将该函数应用于data字段行的方法。

public DataStream<Object> myMap(Function<Object, Object> func) {
return data.map(x -> func.apply(x));
}

因此,在main方法中,我创建了一个简单的函数并将其传递给myMap函数。

public static void main(String[] args) throws Exception {
SP temp = new SP();
DataStream<Object> datastream = temp.getDataFromKakfa("7798", 1).myMap(Test::print) ;
datastream.print() ;
temp.execute();
}
public static Object print(Object o) {
try {
StringBuilder res = new StringBuilder();
for (byte b : serializeObject(o)) {
res.append(String.format("%02X ", b));
res.append(" "); // delimiter
}
return res.toString();
} catch (NullPointerException e){
return 0 ;
} catch (IOException e) {
return 0;
}
}
public static byte[] serializeObject(Object obj) throws IOException
{
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bytesOut);
oos.writeObject(obj);
oos.flush();
byte[] bytes = bytesOut.toByteArray();
bytesOut.close();
oos.close();
return bytes;
}

但是我得到了错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields.

它指的是myMap函数。如何解决问题?这是做这种情况的更直接的方法吗?

没有太多细节就不看这个,看来你的Function<Object, Object> func需要实现Serializable

您可以创建标记接口:

@FunctionalInterface
interface SerializableFuncton<I, O> extends Function<I, O>, Serializable { }

然后将DataStream<Object> myMap(Function<Object, Object> func)更改为DataStream<Object> myMap(SerializableFuncton<Object, Object> func).

相关内容

  • 没有找到相关文章

最新更新