使用SttpBackendStub的单元测试流请求



我有一个请求,它返回我想要测试的源流,如下所示:

/* class */
import sttp.client._
class HttpClient()
(implicit sttpBackend: SttpBackend[Future, Source[ByteString, Any], Nothing]) {
/** Start a download stream for large files */
def downloadStream(url: String): Future[Response[Either[String, Source[ByteString, _]]]] = {
request
.get(uri"url")
.response(asStream[Source[ByteString, Any]])
.send()
}
}
/* test */
import org.scalatest.FlatSpec
import org.scalatest.Matchers
class HttpClientSpec extends FlatSpec with Matchers {
implicit val as = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val ex = as
"downloadStream" should "get a file stream from url" in {
val url = s"http://example.com/api/v1/file/Test.txt"
val body = "abcdefghijklmnopqrstuzwxyz"
val expectedStream = Source.single[ByteString](body.getBytes)
implicit val stubBackend = SttpBackendStub.asynchronousFuture
.whenRequestMatches(req => req.method.method == "GET" && req.uri == uri"$url")
.thenRespond(
Response(Right(expectedStream), StatusCode.Ok)
)
val httpTest = new HttpClient()
val response = Await.result(httpTest.downloadStream(url), 5.seconds)
response.body.right.get should be(expectedStream)
}
}

但是它失败了,因为SttpBackendStub.asynchronousFuture不处理流响应。

type mismatch;
[error]  found   : sttp.client.testing.SttpBackendStub[scala.concurrent.Future,Nothing]
[error]  required: sttp.client.SttpBackend[scala.concurrent.Future,akka.stream.scaladsl.Source[akka.util.ByteString,Any],Nothing]

如何使用SttpBackendStub而不使用类似scalamock的方法?

只需创建自己的Stub类,即可extends预期的响应类型:

class StreamHttpStub
extends SttpBackendStub[Future, Source[ByteString, Any]](new FutureMonad(), PartialFunction.empty, None) {
}
implicit val stubBackend = new StreamHttpStub()
.whenRequestMatches(req => req.method.method == "GET" && req.uri == uri"$url")
.thenRespond(
Response(Right(expectedStream), StatusCode.Ok)
)

相关内容

  • 没有找到相关文章

最新更新