在 Scala 中使用 SinkFunction 创建我自己的接收器



我想使用 scala 为 Flink 创建自己的 Sink,为此我需要扩展接口 SinkFunction。

但是我无法覆盖以下调用方法。

default void invoke(IN value, Context context) throws Exception {
invoke(value);
}

这是我的代码:

class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
override def invoke(elements: List[GenericRecord], context: Context) { ... }

这将给出以下错误:

Type Context takes type parameters

如果我更改代码以添加任何类型参数:

class MySinkFunction(schema: String) extends SinkFunction[List[GenericRecord]] {
override def invoke(elements: List[GenericRecord], context: Context[Xpto]) { ... }

错误消息不同:

Method `invoke` overrides nothing.

我是 Scala 的新手,有什么东西可以解决这个问题吗?

我在scala中看到的所有示例都使用以下已弃用的方法:

/**
* @deprecated Use {@link #invoke(Object, Context)}.
*/
@Deprecated
default void invoke(IN value) throws Exception {}

StreamingFileSink.java 使用以下方式实现:

...
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}

<T>错误的地方,因为它没有在SinkFunction.Context的任何地方使用?

在这种情况下,您可以简单地选择:

override def invoke(elements: List[GenericRecord], context: SinkFunction.Context[_]) { ... }

它应该像一个魅力一样工作。

相关内容

  • 没有找到相关文章

最新更新