使用 fs2 将 URL 流式传输到本地文件



使用 fs2(版本 1.0.4(和猫效应IO,我可以将 URL 流式传输到本地文件,

import concurrent.ExecutionContext.Implicits.global
def download(spec: String, filename: String): Stream[IO, Unit] = 
  io.readInputStream((new URL(spec).openConnection.getInputStream), 4096, global, true)
    .through(io.file.writeAll(Paths.get(filename), global))

但是,此代码段在完成时不会返回有关进程的任何信息。最重要的是,除了知道操作是成功还是失败之外,我还想知道如果操作成功,读取了多少字节。我不想检查新的文件大小来获取此信息。另一方面,如果操作失败,我想知道导致失败的原因。

我尝试attempt但无法解决将原始字节写入新文件的后续步骤。请指教。谢谢

你可能想玩observe。我相信有更好的方法可以做到这一点,但这里有一个例子可以帮助你摆脱困境:

编译和运行的原始代码:

import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global
import java.net.URL
import java.nio.file.Paths
object Example1 {
  implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
  def download(spec: String, filename: String): fs2.Stream[IO, Unit] =
    io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse=true)
      .through(io.file.writeAll(Paths.get(filename), global))
  def main(args: Array[String]): Unit = {
    download("https://isitchristmas.com/", "/tmp/christmas.txt")
      .compile.drain.unsafeRunSync()
  }
}

使用观察来计算字节数:

import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global
import java.net.URL
import java.nio.file.Paths
object Example2 {
  implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
  final case class DlResults(bytes: Long)
  def download(spec: String, filename: String): fs2.Stream[IO, DlResults] =
    io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse = true)
      .observe(io.file.writeAll(Paths.get(filename), global))
      .fold(DlResults(0L)) { (r, _) => DlResults(r.bytes + 1) }
  def main(args: Array[String]): Unit = {
    download("https://isitchristmas.com/", "/tmp/christmas.txt")
      .compile
      .fold(()){ (_, r) => println(r)}
      .unsafeRunSync()
  }
}

输出:

> DlResults(42668)

我已经找到了ResourceIO的解决方案,@codenoodle建议。

更新 #1

删除Resource,因为它在与 FS2 一起使用时是多余的,并且使代码复杂化。

import java.io.{
  FileNotFoundException,
  FileOutputStream,
  InputStream,
  OutputStream
}
import java.net.URL
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2._
import scala.concurrent.ExecutionContext.Implicits.global
object LetStream extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    def write(source: IO[InputStream], target: IO[OutputStream]) =
      io.readInputStream(source, 4096, global)
        .chunks
        .flatMap(Stream.chunk)
        .observe(io.writeOutputStream(target, global))
        .chunks
        .fold(0L)((acc, chunk) => acc + chunk.size)
    write(IO(new FileOutputStream("image.jpg")), 
      IO(new URL("http://localhost:8080/images/helloworld.jpg")
            .openConnection
            .getInputStream))
      .use(_.compile.toList)
      .flatMap(size => 
        IO(println(s"Written ${size.head} bytes")) *> IO(ExitCode.Success))      
      .recover {
        case t: FileNotFoundException =>
          IO(println(s"Not found, ${t.getMessage}")) *> IO(ExitCode.Error)
        case err =>
          IO(println(err.getMessage)) *> IO(ExitCode.Error)
      }
  }
}

相关内容

  • 没有找到相关文章

最新更新