我正在尝试从服务器逐行异步下载数据。事实上,我使用了Kotlin Coroutines,但当我在async lambda块中连接httpurlconnection时,它显示了一个警告,该方法将在那里被阻止,因此我无法异步执行请求。
这是代码:
fun download(urlString : String) = runBlocking{
val url = URL(urlString)
val httpurlconnection = url.openConnection() as HttpURLConnection
httpurlconnection.setRequestMethod("GET")
val response = async {
httpurlconnection.connect()
val scanner = java.util.Scanner(httpurlconnection.inputstream)
val sb = StringBuilder()
while(scanner.hasNextLine()){
val string = scanner.nextLine()
println(string)
sb.append(string)
}
sb.toString()
}
println("length of response = ${response.length}")
}
这是因为您当前使用的是runBlocking
。
您需要在协同程序范围内启动它以使其异步,此外,您还可以将操作切换到IO
。
suspend fun download(urlString : String): Result = withContext(Dispatchers.IO){
...
}
并从您的特定协同程序范围中调用它,例如:
MainScope().launch {
val result = download(baseUrl)
}
这里有几个问题:
-
由于
runBlocking
,您当前的整个download()
函数无论如何都会阻塞当前线程。如果在请求的整个时间内阻塞当前线程,则该方法不会是异步的。如果您正在阻塞当前线程以等待它,那么在其中启动单个async
协程是没有意义的 -
您得到的警告是因为您正在协程中使用阻塞API,这通常是不鼓励的,因为它阻塞了协程运行的线程,而不是协程的设计使用方式。
-
当前的代码正在读取主体中的所有行,以从中构建单个
String
,这使得逐行读取有点毫无意义。
对于问题#1,您可以通过将函数声明为suspend
函数而不是使用runBlocking
来使其异步,并且您可以去掉async
。不过,这确实转移了在download()
的调用站点处理异步的负担。您需要在协程中调用它(使用像async
这样的协程构建器(,并且您需要一个协程作用域。
对于问题#2,通常的解决方法是只在withContext(Disptachers.IO) { ... }
块中执行阻塞调用,因此这些调用是在线程池中完成的,该线程池将根据IO操作的需要创建新线程。如果您可以使用异步API,则不必求助于此,因为您只需将这些API映射到协程模型即可。
在这种情况下,HttpUrlConnection
除了是阻塞之外,还是一个不方便的低级API。如果您在JRE 11+上运行,则可以使用内置的JDK11HttpClient
的异步API,并将其调整为协同程序。
暂时忽略问题#3,因为您当前的代码无论如何都在读取全文,所以一个简单的等价物如下:
import kotlinx.coroutines.future.await
import java.net.URI
import java.net.http.*
suspend fun download(urlString : String): String {
val client = HttpClient.newHttpClient()
val request = HttpRequest.newBuilder().uri(URI.create(urlString)).build()
val futureResponse = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
// await() adapts the async Java future API to the coroutine world.
// It suspends until the response is received (in a non-blocking way)
// and until the full body is read and converted to a String
return futureResponse.await().body()
}
此方法将挂起,直到收到响应为止。您可以使用协程来管理调用此函数的方式。
回到问题3,如果你真的想按行处理,你可以使用一个BodyHandlers来逐行流式传输响应,比如BodyHandlers.lines((:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.future.await
import kotlinx.coroutines.stream.consumeAsFlow
import java.net.URI
import java.net.http.*
suspend fun downloadLines(urlString : String): Flow<String> {
val client = HttpClient.newHttpClient()
val request = HttpRequest.newBuilder().uri(URI.create(urlString)).build()
val futureResponse = client.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
// await() here only waits for the reponse to arrive,
// it doesn't wait until the full response body is read.
// Instead, the Stream<String> returned by body() provides
// the body lines as they come
return futureResponse.await().body().consumeAsFlow()
}
此方法将挂起,直到调用完成,但可能在读取响应之前返回。收集返回的流是在响应的正文行从服务器流式传输时读取它们的方法。
另一种选择是使用Ktor HTTP客户端,并从响应体中一点一点地读取。查看如何阅读响应,或者更具体地说,查看响应流文档。不过,我不得不承认文件相当简洁。所以你通常必须自己弄清楚细节。