我有一个简单的程序,其中包含Avro对象的DataStream,我想从中提取单个字符串字段。我将DataStream
转换为Table
,并运行带有简单投影的查询。
val kinesisConsumer = new FlinkKinesisConsumer(streamName, new UnifiedEventDeserializationSchema, consumerConfig)
val env = StreamExecutionEnvironment.getExecutionEnvironment
implicit val typeInfo = TypeInformation.of(classOf[UnifiedEvent])
val kinesisStream = env.addSource(kinesisConsumer)
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataStream("table1", kinesisStream);
val query = "SELECT nd_key FROM table1"
val result = tableEnv.sql(query)
tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
env.execute()
当我执行程序时,出现以下异常:
2017-11-29 16:07:36 来源:自定义来源 -> 来自: (accepted_cohort_id、admin_id、after_submission、amount_paid、 anonymous_id、application_id、atom_key、bd_group_key、biz_geo、 braavos_purchase_id、类别、cohort_id、concept_key、concept_rank、 上下文、context_campaign、context_experiment、coupon_code, course_key、course_rank、cta_destination、cta_location、cta_message、 cta_type、货币、decision_group_id、device_browser、device_os、 device_os_version、device_type、持续时间、evaluation_id、event_type、 fin_geo、in_collaboration_with、lab_id、lab_rank、标签、lesson_key、 lesson_rank、区域设置、max_pause_duration、消息、message_id、 module_key、module_rank、nd_key、nd_unit_id、nd_unit_rank、 new_cohort_id、notification_id、num_concepts_completed、 num_interactions、num_lessons_completed、old_cohort_id、part_key、 part_rank、pause_duration、pause_reason、payment_plan、 payment_provider, points_earned, points_possible, 价格, price_sheet, product_key、product_type、provider_charge_id、provider_refund_id、 quiz_type, 引荐来源, refund_amount, requested_cohort_id, 结果, scholarship_group_key、search_term、skill_level、subscription_id、 suspension_length, suspension_reason, 技术, 时间戳, total_concepts、total_lessons、total_time_sec、键入、unenroll_reason、 user_id、user_locale、user_response、变体、版本、workspace_id、 workspace_session,workspace_type) -> 选择:(nd_key) -> 到:Utf8 ->接收器:未命名(5/8) 切换到失败 org.apache.flink.api.common.InvalidProgramException: Table program 无法编译。这是一个错误。请提交问题。 在 org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) 在 org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) 在 org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) 在 org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) 在 org.apache.flink.streaming.api.operators .AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) 由以下原因引起: org.codehaus.commons.compiler.CompileException: 第 790 行,第 15 列: 无法从类型"java.lang.CharSequence"进行赋值转换 键入"org.apache.avro.util.Utf8" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) 在 org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534) 在 org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) 在 org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459) 在 org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) 在 org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) 在 org.codehaus.janino.UnitCompiler.compileStatement(UnitCompiler.java:1523) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) 在 org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) 在 org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212) 在 org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:390) 在 org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:385) 在 org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1405) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385) 在 org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234) 在 org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:213) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) atorg.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33) ...8 更多
Avro 对象的字段nd_key
的类型为java.lang.CharSequence
,由 SQL 查询处理。
通过调用toAppendStream[org.apache.avro.util.Utf8]
,您请求将查询结果转换为DataStream[Utf8]
。但是 Flink 无法自动将CharSequence
转换为Utf8
。
尝试将toAppendStream[org.apache.avro.util.Utf8]
更改为toAppendStream[java.lang.CharSequence]
。
你使用的是哪个 Flink 版本?