序列化循环.Flink中的Json



Scala Flinkcirce中序列化json有问题。Json格式。

我使用lib flink-adt来派生TypeInformation包含序列化器的。

implicit val jsonInfo: TypeInformation[Json] =
deriveTypeInformation[Json]

错误信息如下:

magnolia: child class BiggerDecimalJsonNumber of class JsonNumber is neither final nor a case class

我也试过这个。

implicit val jsonInfo: TypeInformation[Json] =
TypeInformation.of(classOf[Json])

它编译了,但是当我运行单元测试时崩溃了。

我通过添加new ExecutionConfig()

使序列化器工作这个代码工作了:

import io.circe._
import io.circe.syntax._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.ExecutionConfig
implicit val jsonTypeInformation: TypeInformation[Json] =
TypeInformation.of(classOf[Json])
val typeInformation = implicitly[TypeInformation[Json]]
val json = """[{"level":"up","num":1.23,"valid":true}]""".asJson
val ser = typeInformation.createSerializer(new ExecutionConfig())
roundtrip(ser, json)

相关内容

  • 没有找到相关文章

最新更新