我刚刚添加了
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.3"
到项目。我在 A 类中有suspend fun foo(): Flow<Bar>
(来自外部包(。
我需要Flowable<Bar>
在 Java 中使用。 如果可能的话,我想使用扩展fun A.fooRx(): Flowable<Bar>
。
您必须从 Kotlin 中的协程中偷偷取出返回的Foo<Bar>
:
// SomeSuspendAPI.kt
// -----------------
// the method to convert
suspend fun <T> Flow<T>.foo() : Flow<Int> {
return flow { emit(0) }
}
@ExperimentalCoroutinesApi
fun <T> Flow<T>.fooRx() : CompletableFuture<Flowable<Int>> {
val self = this
val future = CompletableFuture<Flowable<Int>>()
GlobalScope.launch {
try {
future.complete(self.foo().asFlowable())
} catch (ex: Throwable) {
future.completeExceptionally(ex);
}
}
return future
}
// Demo purposes
fun <T> just(v: T) = flow { emit(v) }
然后你可以在Java中使用它:
public class UseFoo {
public static void main(String[] args) throws Exception {
SomeSuspendAPIKt.fooRx(
SomeSuspendAPIKt.just(1)
)
.thenAccept(flowable -> flowable.subscribe(System.out::println))
.join();
}
}
编辑 1:
当然,您可以将一些代码移回 kotlin 端:
fun <T> Flow<T>.fooRx2() : Flowable<Int> {
val self = this
val subject = SingleSubject.create<Flowable<Int>>()
GlobalScope.launch {
try {
subject.onSuccess(self.foo().asFlowable())
} catch (ex: Throwable) {
subject.onError(ex)
}
}
return subject.flatMapPublisher { it }
}
然后
public class UseFoo {
public static void main(String[] args) throws Exception {
SomeSuspendAPIKt.fooRx2(SomeSuspendAPIKt.just(1))
.blockingSubscribe(System.out::println);
}
}
编辑 2:
你可以通过使用 Kotlin 端的转换来概括这一点,该转换会让你得到一个要传递的延续对象:
fun <T, R: Any> Flow<T>.transformAsync(fn: suspend (t: Flow<T>) -> Flow<R>) : Flowable<R> {
val self = this
val subject = SingleSubject.create<Flowable<R>>()
GlobalScope.launch {
try {
val r = fn(self).asFlowable();
subject.onSuccess(r)
} catch (ex: Throwable) {
subject.onError(ex)
}
}
return subject.flatMapPublisher { it }
}
public class UseFoo {
public static void main(String[] args) throws Exception {
SomeSuspendAPIKt.transformAsync(
SomeSuspendAPIKt.just(1),
(source, cont) -> SomeSuspendAPIKt.foo(source, cont)
)
.blockingSubscribe(System.out::println);
}
}