>编辑:我已经在 https://stackoverflow.com/a/60235242/3236516 上描述了我们的解决方案
我有一个java对象。它是扩展抽象类的众多子类之一的实例。我想修改它的方法之一,以便在调用原始方法之前运行一些额外的代码。我的目标在概念上与AspectJ中的切入点相同。
如果我创建原始对象的一些修改版本而不是改变原始对象,那就没问题了。如果解决方案涉及字节码操作,也可以。
先前的工作
我考虑过通过JavaAssist创建一个代理。问题是 ProxyFactory 的创建方法希望我提前知道构造函数输入类型。我没有。我可以在不通过 Objenesis 调用构造函数的情况下创建我的对象,但是生成的代理对象对于构造函数设置的任何值都将具有 null 值。这意味着每当直接引用构造函数设置的值时,我生成的对象的行为将与原始对象不同。
上下文
我们正在通过 AWS Kinesis Data Analytics 使用 Flink 来转换一些流数据。我们希望在所有 StreamOperator 的 open(( 方法的开头包含一些通用代码,而不必修改每个运算符。一个用例是确保自定义指标代理在运行操作员的每个实例上运行。
使用 Byte Buddy,您可以创建一个包装器或一个 Java 代理,它们都可以实现这一目标。如果您在包装类的构造函数调用方面遇到困难,那么使用 Byte Buddy 会出现同样的问题,因为任何库都绑定到 JVM 给出的约束。
要创建 Java 代理程序,请使用AgentBuilder
。然后,您可以使用type
步骤指定要拦截的所有类型,例如实现某个接口或扩展类的所有类型。对于transform
,Byte Buddy提供了一个名为Advice
的方法decoraction API,它允许你添加额外的代码,例如:
class MyAdvice {
@Advice.OnMethodEnter
static void enter() { System.out.println("Hello"); }
}
由
builder = builder.visit(Advice.to(MyAdvice.class).on(named("foo")));
例如,您可以为指定的类型在名为"FOO"的所有方法的开头打印 Hello World。您可以在java.instrument
包的包文档中找到有关 Java 代理的更多信息。
特定于 Flink 的解决方案可能是实现您正在使用的 Flink 运算符的自定义版本。我不相信这会把你带到一个好地方;只是分享这个想法,以防有帮助。
关于如何实现自定义运算符的文档并不多,但有一个关于这个主题的 Flink Forward 讨论。
首先,我会在 AWS 上提交功能请求以支持您的使用案例。这将是最干净的解决方案。
其次,我会避免找到任何方法来覆盖open()
.由于您所处的环境没有太多控制权,因此我认为这些方法要么根本不起作用,要么很脆弱,并且会随着环境的更新而中断。
我会在各自的 UDF 方法中进行延迟初始化,当然也会在一些常见的实用程序方法中将其分解出来。
private Counter counter;
@Override
public Integer map(String value) {
if (counter == null) {
RuntimeContext ctx = getRuntimeContext();
counter = ctx.getMetricGroup().counter("outputs");
}
counter.inc();
return Integer.parseInt(value);
}
来自原始提问者的答案:我们通过为 StreamExecutionEnvironment 创建一个 ByteBuddy 代理来解决这个问题,该代理拦截了对 getStreamGraph 的调用,并将每个节点的 jobVertexClass 重新转换为扩展原始类类型的类,但包含我们的自定义逻辑。由于不同的类需要不同的参数,因此我们使用 Objenesis 实例化了代理,而无需调用构造函数。为了解决通常在构造函数中设置的私有字段保留为空的问题,我们使用反射来更改所有私有字段的可见性,然后将每个字段值从原始对象复制到代理对象。
我们没有采用 Rafael Winterhalter 提出的代理解决方案,因为它要求能够在每个工作线程实例上运行代理设置代码,这类似于想要在每个工作线程计算机上启动指标代理的原始问题。虽然我在最初的问题中没有说明这一点,但创建代理对象的代码发生在 Flink 作业管理机器上,而不是工作机器上。