我只是在Flink中尝试使用Scala类型的类。我定义了以下类型的类接口:
trait LikeEvent[T] {
def timestamp(payload: T): Int
}
现在,我想考虑LikeEvent[_]
的DataSet
,如下所示:
// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)
// create instances for the raw events
object EventInstance {
implicit val logEvent = new LikeEvent[Log] {
def timestamp(log: Log): Int = log.ts
}
implicit val metricEvent = new LikeEvent[Metric] {
def timestamp(metric: Metric): Int = metric.ts
}
}
// add ops to the raw event classes (regular class)
object EventSyntax {
implicit class Event[T: LikeEvent](val payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
}
以下应用程序运行良好:
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
Metric(1586736000, "cpu_usage", 0.2),
Log(1586736005, 1, "invalid login"),
Log(1586736010, 1, "invalid login"),
Log(1586736015, 1, "invalid login"),
Log(1586736030, 2, "valid login"),
Metric(1586736060, "cpu_usage", 0.8),
Log(1586736120, 0, "end of world"),
)
// count events per hour
val eventsPerHour = events
.map(new GetMinuteEventTuple())
.groupBy(0).reduceGroup { g =>
val gl = g.toList
val (hour, count) = (gl.head._1, gl.size)
(hour, count)
}
eventsPerHour.print()
打印预期输出
(0,5)
(1,1)
(2,1)
但是,如果我这样修改语法对象:
// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {
case class Event[T: LikeEvent](payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)
}
我得到以下错误:
type mismatch;
found : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]
因此,在消息的指导下,我做了以下更改:
val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)
之后,错误变为:
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]
我无法理解为什么EventSyntax2
会导致这些错误,而EventSyntax
编译并运行良好。为什么在EventSyntax2
中使用case类包装器比在EventSyntax
中使用正则类问题更大?
无论如何,我的问题有两个:
- 如何解决
EventSyntax2
的问题 - 实现我的目标最简单的方法是什么?在这里,我只是为了学习而尝试类型类模式,但对我来说,更面向对象的方法(基于子类型(看起来更简单
// Define trait
trait Event {
def timestamp: Int
def payload: Product with Serializable // Any case class
}
// Metric adapter (similar for Log)
object MetricAdapter {
implicit class MetricEvent(val payload: Metric) extends Event {
def timestamp: Int = payload.ts
}
}
然后简单地使用val events: DataSet[Event] = env.fromElements(...)
。
注意,实现某个类型类的类列表提出了类似的问题,但它考虑的是简单的ScalaList
,而不是FlinkDataSet
(或DataStream
(。我问题的重点是使用Flink中的类型类模式以某种方式考虑异构流/数据集,以及它是否真的有意义,或者在这种情况下应该明确支持一个规则特性并从中继承,如上所述
顺便说一句,你可以在这里找到代码:https://github.com/salvalcantara/flink-events-and-polymorphism.
简短回答:Flink无法在通配符类型的scala中导出TypeInformation
长话短说:你的两个问题都在问,TypeInformation
是什么,它是如何使用的,以及它是如何派生的。
TypeInformation
是Flink的内部类型系统,当数据在网络上混洗并存储在状态后端时(当使用DataStream api时(,它使用该系统来序列化数据。
序列化是数据处理中的一个主要性能问题,因此Flink包含用于常见数据类型和模式的专用序列化程序。开箱即用,在其Java堆栈中,它支持所有JVM原语、Pojo、Flink元组、一些常见的集合类型和avro。类的类型是通过反射来确定的,如果它与已知类型不匹配,它将返回到Kryo。
在scala-api中,类型信息是使用implicits派生的。scala DataSet和DataStream api上的所有方法都将其泛型参数注释为隐式类型类。
def map[T: TypeInformation]
这个TypeInformation
可以像任何类型类一样手动提供,也可以使用从flink导入的宏派生。
import org.apache.flink.api.scala._
这个宏通过支持scala元组、scala case类和一些常见的scala-std库类型来装饰java类型堆栈。我之所以说decorator,是因为如果您的类不是那种类型,那么它可以而且将返回到java堆栈。
那么,为什么版本1有效呢?
因为它是一个类型堆栈无法匹配的普通类,所以它将其解析为泛型类型,并返回了一个基于kryo的序列化程序。您可以从控制台测试它,并看到它返回一个泛型类型。
> scala> implicitly[TypeInformation[EventSyntax.Event[_]]]
res2: org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax.Event[_]] = GenericType<com.salvalcantara.fp.EventSyntax.Event>
版本2不起作用,因为它将类型识别为事例类,然后为其每个成员递归地派生TypeInformation
实例。这对于通配符类型是不可能的,因为通配符类型不同于Any
,因此派生失败。
通常,您不应该将Flink与异构类型一起使用,因为它将无法为您的工作负载派生高效的序列化程序。