Scalding单元测试-如何编写本地文件



我在一个使用特定API来跟踪数据集元数据的地方工作。当从普通写入转换为这些特殊写入时,Key/Value、TSV/CSV、Thrift。。。数据集。我想比较一下二进制文件是相同的前转换和后转换到特殊的API。

鉴于我无法为包含元数据的写入提供特定的api,我只想问如何在TypedPipe上为.write方法编写单元测试?

implicit val timeZone: TimeZone = DateOps.UTC
implicit val dateParser: DateParser = DateParser.default
implicit def flowDef: FlowDef = new FlowDef()
implicit def mode: Mode = Local(true)
val fileStrPath = root + "/test"
println("writing data to " + fileStrPath)
TypedPipe
.from(Seq[Long](1, 2, 3, 4, 5))
// .map((x: Long) => { println(x.toString); System.out.flush(); x })
.write(TypedTsv[Long](fileStrPath))
.forceToDisk

上面的内容似乎没有向本地(OSX(磁盘写入任何内容。

所以我想知道我是否需要使用像这样的MiniDFSCluster

def setUpTempFolder: String = {
val tempFolder = new TemporaryFolder
tempFolder.create()
tempFolder.getRoot.getAbsolutePath
}
val root: String = setUpTempFolder
println(s"root = $root")
val tempDir = Files.createTempDirectory(setUpTempFolder).toFile
val hdfsCluster: MiniDFSCluster = {
val configuration = new Configuration()
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath)
configuration.set("io.compression.codecs", classOf[LzopCodec].getName)
new MiniDFSCluster.Builder(configuration)
.manageNameDfsDirs(true)
.manageDataDfsDirs(true)
.format(true)
.build()
}
hdfsCluster.waitClusterUp()
val fs: DistributedFileSystem = hdfsCluster.getFileSystem
val rootPath = new Path(root)
fs.mkdirs(rootPath)

然而,我尝试让这个MiniCluster工作也没有成功——不知何故,我需要将MiniCluster与Scalding写入链接起来。

注意:用于单元测试的ScaldingJobTest框架不起作用,因为在包含元数据的写入API进行写入之前,写入的实际数据有时会被封装在双射编解码器中,或者使用事例类包装器进行设置。

有什么想法可以用单独的Scalding或MiniCluster编写本地文件(不使用Scalding REPL(吗?(如果稍后使用,我需要一个如何读取文件的提示。(

正在回答。。。这里有一个如何使用迷你集群来精确读取和写入HDFS的示例。我将能够与我不同的写作进行交叉阅读并进行检查。这是在烫伤的TypedParquet型的测试中

HadoopPlatformJobTest是使用MiniCluster的JobTest的扩展。在链接中放弃了一些细节,大部分代码是这样的:

"TypedParquetTuple";应该{"读写正确";在{导入com.twitter.scalding.parquet.tuple.TestValues_def toMap[T](i:Iterable[T](:Map[T,Int]=i.groupBy(identity(.mapValues(_.size(

HadoopPlatformJobTest(new WriteToTypedParquetTupleJob(_), cluster)
.arg("output", "output1")
.sink[SampleClassB](TypedParquet[SampleClassB](Seq("output1"))) {
toMap(_) shouldBe toMap(values)
}
.run()
HadoopPlatformJobTest(new ReadWithFilterPredicateJob(_), cluster)
.arg("input", "output1")
.arg("output", "output2")
.sink[Boolean]("output2")(toMap(_) shouldBe toMap(values.filter(_.string == "B1").map(_.a.bool)))
.run()
}
}

最新更新