我想使用 scala 为 Flink 创建自己的 Sink,为此我需要扩展接口 SinkFunction。
但是我无法覆盖以下调用方法。
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
这是我的代码:
class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
override def invoke(elements: List[GenericRecord], context: Context) { ... }
这将给出以下错误:
Type Context takes type parameters
如果我更改代码以添加任何类型参数:
class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
override def invoke(elements: List[GenericRecord], context: Context[Xpto]) { ... }
错误消息不同:
Method `invoke` overrides nothing.
我是 Scala 的新手,有什么东西可以解决这个问题吗?
我在scala中看到的所有示例都使用以下已弃用的方法:
/**
* @deprecated Use {@link #invoke(Object, Context)}.
*/
@Deprecated
default void invoke(IN value) throws Exception {}
StreamingFileSink.java 使用以下方式实现:
...
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}
这<T>
错误的地方,因为它没有在SinkFunction.Context的任何地方使用?
在这种情况下,您可以简单地选择:
override def invoke(elements: List[GenericRecord], context: SinkFunction.Context[_]) { ... }
它应该像一个魅力一样工作。