使用Kafka Connect(6.1.1
(,我试图在将Kafka消息放入BigQuery(使用BigQuerySink(之前使用Sergey34/Kafka连接转换器来调整它们。
在我的connector.properties
中,我如下配置ScriptEngineTransformer
(最小化示例(:
transforms=scriptenginetransformer
transforms.scriptenginetransformer.type=seko.kafka.connect.transformer.script.ScriptEngineTransformer
transforms.scriptenginetransformer.scrip_engine_name=javascript
transforms.scriptenginetransformer.value.script=function valueTransform(source){ source.foo = 42; return source;}
但在运行时,我得到以下错误:
javax.script.ScriptException: TypeError: Cannot set property "foo" of Struct{a=111,b=222} in <eval> at line number 2
(此处为堆栈跟踪(
根据我对JS的初步理解,我认为应该可以将属性替换或添加到结构中,即以下内容当然可以(在沙盒中(:
function Foo(x, y) {
this.x = x;
this.y = y;
}
foo = new Foo(1, 2);
foo.y = 3;
foo.z = 4;
console.log(foo);
看起来我的(Avro(Kafka消息({a=111,b=222}
(被正确地传递给了JS脚本。那么这个错误意味着什么呢?
如果你需要添加一个静态字段,Kafka提供了一个内置的转换来实现这一点。。。
关于您的问题,阅读代码时,它从不测试或使用具有模式的记录,也从不构建新的Struct类型
因此,我认为您的输入仅限于基本模式类型,如字符串/整数/布尔
换句话说,"Struct{a=111,b=222}" + "foo"
将";工作良好";您最终会得到"Struct{a=111,b=222}foo"
,但Avro记录的字符串表示"Struct{a=111,b=222}"
没有Javascript属性foo
,因此无法将其设置为
您的替代方案/解决方法是确保使用标准的JSONConverter,然后使用JSON.parse
构建一个对象,您可以将JS属性设置到中