动态解析flinkmap中的json



我使用flink动态分析json类型的数据,对给定列进行keyby和sum,在我的mapFunction中,我将json转换为case类,但结果流在keyby函数中没有得到编译器,得到了错误Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key.。我的代码像这个

//conf.properties
columns=a:String,b:Int,c:String,d:Long
declusteringColumns=a,c
statsColumns=b
//main function
stream.map(new MapFunc)
.keyBy(declusteringColumns(0), declusteringColumns.drop(0).toSeq: _*)
.sum(statsColumns)
class MapFunc extends RichMapFunction[String,Any]{
var clazz:Class[_]=_
override def open(parameters: Configuration): Unit = {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox() 
clazz = tb.compile(tb.parse(
"""|case class Test(a:String,b:Int,c:String,d:Long){}
|scala.reflect.classTag[Test].runtimeClass"""
.stripMargin)).apply.asInstanceOf[Class[_]] 
}
override def map(value: String) {
val tmp = JSON.parseObject(value)
val values = Utils.loadProperties("columns").split(",").map(y => {
val name = y.substring(0, y.indexOf(":"))
val tpe = y.substring(y.indexOf(":") + 1)
tpe.toLowerCase match {
case "string" => tmp.getString(name)
case "int" => tmp.getInteger(name)
case "long" => tmp.getLong(name)
case _ => null}}).toSeq
clazz.getConstructors()(0).newInstance(values: _*) 
}}

如何将json转换为case类或元组?

实际上,异常

org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType<Test>) cannot be used as key 

即使对于普通情况类(不是通过反射生成的(也保持不变

case class Test(a: String, b: Int, c: String, d: Long)

第一个问题是这个案例类不是POJO

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos

POJOs

Flink将Java和Scala类视为一种特殊的POJO数据如果它们满足以下要求,则键入:

  • 类必须是公共的。

  • 它必须有一个没有参数的公共构造函数(默认构造函数(。

  • 所有字段要么是公共的,要么必须可以通过getter和setter函数访问。对于名为foo的字段,getter和setter方法必须命名为getFoo((和setFoo(((。

  • 已注册的序列化程序必须支持字段的类型。

所以你应该更换

case class Test(a: String, b: Int, c: String, d: Long)

带有

import scala.beans.BeanProperty
case class Test(
@BeanProperty var a: String,
@BeanProperty var b: Int,
@BeanProperty var c: String,
@BeanProperty var d: Long) {
def this() = {
this(null, 0, null, 0)
}
}

第二个问题可能是Flink不允许内部类POJO不是静态内部类,但反射工具箱生成嵌套在方法中的本地类

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-对于pojo类型

POJO类型的规则

Flink将数据类型识别为POJO类型(并允许"按名称"字段引用(,如果满足以下条件:

  • 这个类是公共的和独立的(没有非静态的内部类(
  • 该类有一个公共的无参数构造函数
  • 类(和所有超类(中的所有非静态、非瞬态字段要么是公共的(也是非最终的(,要么具有公共的getter方法和遵循Java bean命名的setter方法getter和setter的约定

这是工具箱生成代码的反编译版本

public final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {
public Object wrapper() {
new LazyRef();
class Test$1 implements Product, Serializable {
private String a;
private int b;
private String c;
private long d;
...
}
return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();
}
...
}

完整的反编译代码:

https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb97947ae872d220

因此,如果真的有必要为Flink生成一个类,那么它应该手动生成,而不是通过工具箱生成

https://www.reddit.com/r/scala/comments/gfcmul/compile_scala_source_from_string_and/

https://www.reddit.com/r/scala/comments/jckld2/is_there_a_way_to_load_scala_code_at_runtime/

如何评估使用InterfaceStability注释的代码(该注释因"涉及类InterfaceStability的非法循环引用"而失败(?

如何以编程方式编译和实例化Java类?

在运行时动态编译多个Scala类

Scala反射中的张量流

但是带有类的代码手动生成

https://gist.github.com/DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39

仍然抛出CCD_ 2。

我想原因如下(这是第三期(。

带有普通case类(未生成(的代码似乎可以使用

https://gist.github.com/DmytroMitin/af426d4578dd5e76c9e0d344e6f079ce

但如果我们用Any替换类型Test,则它抛出This type (GenericType<java.lang.Object>) cannot be used as key

https://gist.github.com/DmytroMitin/a23e45a546790630e838e60c7206adcd

通过反射,我们只能返回Any.


现在我正在生成的代码中创建TypeInformation[Test],这似乎修复了This type (GenericType<java.lang.Object>) cannot be used as key,但现在我有了

org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable. 
The object probably contains or references non serializable fields.

https://gist.github.com/DmytroMitin/16d312dbafeae54518f7ac2c490426b0


我解决了InvalidProgramException: UTF-8 is not serializable@transient注释MapFunc字段的问题

https://gist.github.com/DmytroMitin/f2f859273075370c4687a30e0c3a2431


实际上,如果我们在生成的代码中创建TypeInformation,那么工具箱就足够了

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.reflect.runtime
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox
object App {
val toolbox = ToolBox(runtime.currentMirror).mkToolBox()
class MapFunc extends RichMapFunction[String, Any] {
var typeInfo: TypeInformation[_] = _
@transient var classSymbol: ClassSymbol = _
override def open(parameters: Configuration): Unit = {
val code =
"""|case class Test(
|                 @scala.beans.BeanProperty var a: String,
|                 @scala.beans.BeanProperty var b: Int,
|                 @scala.beans.BeanProperty var c: String,
|                 @scala.beans.BeanProperty var d: Long) {
|  def this() = {
|    this(null, 0, null, 0)
|  }
|}""".stripMargin
val tree = toolbox.parse(code)
classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClass
typeInfo = toolbox.eval(
q"org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[${classSymbol.toType}])"
).asInstanceOf[TypeInformation[_]]
}
override def map(value: String): Any = {
val values = Seq("aaa", 1, "ccc", 2L) //hardcoded for now
createClassInstance(classSymbol, values: _*)
}
}

def main(args: Array[String]): Unit = {
val func = new MapFunc
func.open(new Configuration)
val classInstance = func.map("""{a: "aaa", b: 1, c: "ccc", d: 2}""")
println(classInstance) //Test(aaa,1,ccc,2)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test, fields = [a: String, b: Integer, c: String, d: Long]>
val res = stream.map(func)(typeInfo).keyBy("a", "c").sum("b")
println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904
}
def createClassInstance(classSymbol: ClassSymbol, args: Any*): Any = {
val runtimeMirror = toolbox.mirror
val classType = classSymbol.typeSignature
val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
val classMirror = runtimeMirror.reflectClass(classSymbol)
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
constructorMirror(args: _*)
}
}

相关内容

  • 没有找到相关文章

最新更新