如何在 Scala 中编写资源,同时仍然使用 scala-arm 正确关闭它们?



我有一个类,它获取一个本地文件,对其进行转换,并将其存储在GCS中:

import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._

class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
if (destination.unzipGzip) {
for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
ByteStreams.copy(input, output)
}
} else {
for (input <- managed(Files.newInputStream(localPath));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
IOUtils.copy(input, output)
}
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

我正在尝试删除一些代码重复,特别是创建fileInputStreamgcsOutputStream。但是我不能简单地在方法的顶部提取这些变量,因为它会在 scala-armmanaged块之外创建资源:

import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._

class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
// FIXME: creates a resource outside of the ARM block
val fileInputStream = Files.newInputStream(localPath)
val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))
if (destination.unzipGzip) {
unzipGzip(fileInputStream, gcsOutputStream)
} else if (destination.decompressBzip2) {
decompressBzip2(fileInputStream, gcsOutputStream)
} else {
copy(fileInputStream, gcsOutputStream)
}
}
private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input ← managed(new ZipInputStream(inputStream));
output ← managed(new GZIPOutputStream(outputStream))) {
ByteStreams.copy(input, output)
}
}
private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(new BZip2CompressorInputStream(inputStream));
output <- managed(outputStream)) {
ByteStreams.copy(input, output)
}
}
private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(inputStream);
output <- managed(outputStream)) {
IOUtils.copy(input, output)
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

如您所见,代码更清晰,更易于测试,但由于资源未被"托管",因此无法正确处理资源。例如,如果在创建gcsOutputStream时引发异常,则不会关闭fileInputStream

我可能可以使用谷歌番石榴源和水槽来解决这个问题,但我想知道在 Scala 中是否有更好的方法来做到这一点,而无需引入番石榴。理想情况下,使用标准库或scala臂功能,甚至可能Cats

  • 我是否应该将fileInputStreamgcsOutputStream定义为不带任何内容并返回流的函数?似乎代码会更加冗长,到处都是() => InputStream() => OutputStream
  • 我应该使用多个 scala-arm "托管" 进行推导(一个用于定义fileInputStreamgcsOutputStream,另一个用于每个子函数(?如果我这样做,"内部"输入流将被关闭两次不是问题吗?
  • 有没有一种干净和"缩放"的方法,我没有看到?

你可以像这样重构它:

首先,声明托管资源:

val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))

它不会打开这些资源,它只是声明您希望管理这些资源。

然后你可以使用map将它们包装在所需的装饰器中(如ZipInputStream(:

if (destination.unzipGzip) {
for (input ← fileInputStream.map(s => new ZipInputStream(s));
output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
output <- gcsOutputStream) {
ByteStreams.copy(input, output)
}
} else {
for (input <- fileInputStream;
output <- gcsOutputStream) {
IOUtils.copy(input, output)
}
}

当然ManagedResource[A]只是值,所以你甚至可以把它作为参数传递给方法:

private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
for (input ← inputStream.map(s => new ZipInputStream(s));
output ← outputStream.map(s => new GZIPOutputStream(s))) {
ByteStreams.copy(input, output)
}
}

相关内容

  • 没有找到相关文章

最新更新