我是 Kotlin 协程中的新手。
这里使用经典线程编写代码:
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import okhttp3.*
import okio.ByteString
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger
object BithumbSocketListener : WebSocketListener() {
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
super.onFailure(webSocket, t, response)
Thread {
оkHttpClient.newWebSocket(wsRequest, BithumbSocketListener)
}.start()
}
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
logger.debug("ws_onMessage: text = $text")
}
}
fun main(args: Array<String>) {
currenciesList = currencies.split(",")
currenciesList.forEach {
OkHttpClient().newWebSocket(wsRequest, BithumbSocketListener)
}
}
如您所见,我有货币列表(currenciesList
(。我迭代它并为列表中的每个项目调用newWebSocket
。如您所见BithumbSocketListener
是一个单例。
有问题,则调用回调方法onFailure
,我在单独的 java 线程中创建新的 web 套接字:
Thread {
оkHttpClient.newWebSocket(wsRequest, BithumbSocketListener)
}.start()
好。它工作正常。但是我想用 Kotlin 协程替换这段代码。我该怎么做?
谢谢。
由于您正在处理异步消息流,因此应通过实现参与者将其移植到协程,例如
val wsActor: SendChannel<String> = actor {
for (msg in channel) {
logger.info("Another message is in: ${msg}")
}
}
从wsActor
的类型中,您可以看到您应该向其发送消息。这就是桥接代码的用武之地:
class BithumbSocketListener(
private val chan: Channel<String>
) : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
chan.send(text)
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
оkHttpClient.newWebSocket(wsRequest, this)
}
}
请注意,与您的代码相比,我不会启动任何新线程进行重试。这与移植到协程无关,您的代码也不需要它。 newWebSocket
是立即返回的异步调用。
最后,启动每种货币的 websocket:
currenciesList.forEach {
OkHttpClient().newWebSocket(wsRequest, BithumbSocketListener(wsActor)
}