JpaRepository 不会在自定义 RichSinkFunction 中自动布线



我已经创建了一个自定义的Flink RichSinkFunction,并试图在这个自定义类中自动连接一个JpaRepository,但我不断得到一个NullPointerException。 如果我在构造函数中自动连线它,我可以看到已经找到了 JpaRepo - 但是当调用调用方法时,我会收到一个NullPointerException

public interface MessageRepo extends JpaRepository<Message, Long> {
}

@Component
public class MessageSink extends RichSinkFunction<Message> {
private final transient MessageRepo messageRepo; //if i don't make this transient, i get the error message "The implementation of the RichSinkFunction is not serializable"
@Autowired
public MessageSink(MessageRepo messageRepo){
this.messageRepo = messageRepo;
messageRepo.save(new Message()); //no issues when i do this
}
@Override
public void invoke(Message message, Context context) {
// the message is not null
messageRepo.save(message); // NPE
}

以前有没有人遇到过这个问题?看起来MessageSink调用方法是在单独的线程中调用的,这就是为什么messageRepo总是null的原因? 我的代码的其他部分可以使用 MessageRepo,除了当我有自己的自定义接收器时。

我认为这里的问题是 flink 在分发给其工作线程之前需要序列化自定义接收器函数。

通过标记 MessageRepo 传输,这意味着当工作节点取消此函数时,该字段将为 null。通常,您将在 open 函数中初始化传输依赖项,该函数将在对象反序列化后调用。

我不清楚原因,但我认为在注入豆子时,Spring Boot 优先考虑您的服务类。当我尝试为我的实体类编写侦听器时,我遇到了类似的问题。这就是我解决它的方式。 创建一个实现 ApplicationContextAware 接口并重写 setApplicationContext 方法的组件类。在你的类中有一个名为getBean的静态方法,它将在你的第一个请求时自动连接。示例代码---

@Component
public class SpringBeansUtil implements ApplicationContextAware {
private static ApplicationContext context;
@SuppressWarnings("static-access")
@Override
public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
this.context = applicationContext;
}
public static <T> T getBean(Class<T> beanClass) {
return context.getBean(beanClass);
}
}

然后只需在代码中获取 bean ------->>ClassName referenceName = (ClassName(SpringBeansUtil.getBean(ClassName.class(;

相关内容

  • 没有找到相关文章

最新更新