为什么Observable功能在一次调用中被执行两次



程序的完整结构

注释:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface UserAnnotation {
}

然后创建了一个拦截器:

public class UserInterceptor implements MethodInterceptor {
private static final Logger logger = LoggerFactory.getLogger(UserInterceptor.class);
@Inject
UserService userService; // this is not working
public Object invoke(MethodInvocation invocation) throws Throwable {
logger.info("UserInterceptor : Interceptor Invoked");
Object result = invocation.proceed();
Observable<List<User>> observable = (Observable<List<Sample>>) result;
observable.flatMap(Observable::from).subscribe(object -> {
User user = (User)object
SampleSender sender = new SampleSender();
sender.setBoolean(user.isBoolean());
logger.info("Pushing Data into Sender");
userService.insert(String.join("_", "key", "value"), sender); 
}
return result;
}
}

然后我创建了一个GuiceModule,如下所示:-

public class UserModule extends AbstractModule {
@Override
protected void configure() {
SampleInterceptor interceptor = new SampleInterceptor()
requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(SampleAnnotation.class), interceptor);
}

}

我使用上面注释的类是

// This class also have so many method and this was already declared and using in another services, I created a sample class here
class UserClassForInterceptor {
@Inject
AnotherClass anotherClass;
// this userMethod() is not a new method, its already created, 
// now I am adding annotation to it, because after finishing this functionality, 
// I want something should be done, so created annotation and added here
@UserAnnotation
public Observable<List<Sample>> userMethod() {
logger.info("This is printing only once");
return anotherClass.getUser().flatMap(user ->{
logger.info("This is also printing twice");
// this logger printed twise means, this code snippet is getting executed twise
});
}
}
public class AnotherClass{
public Observable<User> getUser(){
Observable<Sample> observableSample = methodReturnsObservableSample();
logger.info("Getting this logger only once");
return observableSample.map(response-> {
logger.info("This logger is printing twice");
//here have code to return observable of User
});
}
}

如果我删除可观察对象内的注释记录器,则只打印一次,但当我使用注释时,这些记录器将打印两次。我不知道它为什么会这样。

我有一个RestModule,我使用它绑定UserClassForInterceptor,如下

public final class RestModule extends JerseyServletModule {
// other classes binding
bind(UserClassForInterceptor.class).in(Scopes.SINGLETON);
// other classes binding
install(new SampleModule());
}

现在我有一个bootsrap类,我在其中绑定RestModule

public class Bootstrap extends ServerBootstrap {
binder.install(new RestModule());
}

用法:-

@Path("service/sample")
public class SampleRS {
@Inject
UserClassForInterceptor userClassForInterceptor;
public void someMethod() {
userClassForInterceptor.sampleMethod();
}
}

您创建了一个注释@UserAnnotation和一个用于注释的拦截器类。将注释附加到方法userMethod()

拦截器例程要做的第一件事是调用userMethod()以获取它返回的可观察对象,然后拦截器订阅返回的可观测对象,从而显示第一条日志消息。最终,拦截器将可观察到的结果返回给原始调用者。当其他人订阅了返回的observable时,观察者链会被第二次激活,因此日志消息会出现两次。

RxJava有副作用

虽然RxJava是"功能反应式编程"概念的实现,但您(以功能方式(构建的观察者链只有在订阅时才能工作,而这些订阅会产生副作用。日志输出是一个副作用,可能也是最良性的;对变量的更改或对方法的调用会产生更广泛的影响。

当一个观察者链被构造(正确地(时,它充当一个潜在的计算,直到有一个订阅者。如果你需要有多个订阅者,就像你的问题域一样,那么你必须决定是需要为每个订阅激活观察者链(正常情况下(,还是只为所有重叠的订阅激活一次。

如果希望所有重叠的订阅共享相同的可观察对象,则可以使用share()运算符。有许多相关的运算符会影响可观察性和订阅的生存期。这里有一个概述:如何使用RxJava share((运算符?

面向方面编程:拦截器和Guice

您的代码使用Guice来提供一种称为"面向方面编程"的功能。这允许您在程序中引入代码,以解决交叉问题,或通过设置受控网关来增强其功能。使用Guice或类似的AOP方法需要规程

在您的案例中,您使用拦截过程通过订阅具有非琐碎副作用的观察者链来造成无法解释的(迄今为止(副作用。想象一下,您拦截的方法建立了一个一次性连接,而您的拦截器在完成其工作时用完了该连接,导致原始调用者无法使用该连接。

您需要的规程是理解拦截器必须遵守的规则。想想诸如"首先,不要伤害他人"之类的规则。

做事FRP方式

如果在处理用户信息时需要添加额外的步骤,那么您应该在拦截器中构造一个新的可观察对象,但仅当原始调用者订阅了可观察对象时:

Object result = invocation.proceed();
Observable<List<User>> observable = (Observable<List<Sample>>) result;
Observable<List<User>> newObservable = observable
.doOnNext( sampleList ->
Observable.fromIterable( sampleList )
.subscribe(object -> {
User user = (User)object
SampleSender sender = new SampleSender();
sender.setBoolean(user.isBoolean());
logger.info("Pushing Data into Sender");
userService.insert(String.join("_", "key", "value"), sender); 
}));
return newObservable;

通过返回修改后的观察者链,您不会从原始观察者链引入副作用,并确保您在自己的代码中引入的副作用将只有在订阅原始观察者链条时才会触发。

这段代码也帮助了我

public Object invoke(MethodInvocation invocation) throws Throwable {
Object result = null;
try{
logger.debug("Interceptor Invoked");
result = invocation.proceed();
Observable<List<User>> observable = (Observable<List<User>>)result;
return observable
.doOnNext(this::updateUser);
}
catch(Exception ex){
logger.error("Error: ",ex);
}
return result;
}
private void updateUser(List<User> users) {
if(CollectionUtils.isNotEmpty(users)) {
for(User user: users) {
SampleSender sender = new SampleSender();
sender.setBoolean(user.isBoolean());
logger.info("Pushing Data into Sender");
userService.insert(String.join("_", "key", "value"), sender); 
}
}
}

最新更新