如何在将RasterFrameLayer转换为Raster时修复NPE



在训练机器学习模型后,我正试图将RasterFrames中预测的RasterFrameLayer转换为GeoTiff文件。当使用来自光栅帧的演示数据Elkton VA时,它工作得很好
但是,当使用一个带有ndvi标记的裁剪sentinel 2a tif(从-1000标准化到1000(时,它在toRaster步骤中失败,出现NullPointedException
感觉这是由于ROI之外没有数据值。测试数据在这里,geojson和log。

Geotrellis版本:3.3.0
Rasterframes版本:0.9.0


import geotrellis.proj4.LatLng
import geotrellis.raster._
import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.raster.io.geotiff.reader.GeoTiffReader
import geotrellis.raster.render.{ColorRamps, Png}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql._
import org.locationtech.rasterframes._
import org.locationtech.rasterframes.ml.{NoDataFilter, TileExploder}
object ClassificiationRaster extends App {
def readTiff(name: String) =  GeoTiffReader.readSingleband(getClass.getResource(s"/$name").getPath)
def readMtbTiff(name: String): MultibandGeoTiff =  GeoTiffReader.readMultiband(getClass.getResource(s"/$name").getPath)
implicit val spark = SparkSession.builder()
.master("local[*]")
.appName(getClass.getName)
.withKryoSerialization
.getOrCreate()
.withRasterFrames
import spark.implicits._
val filenamePattern = "xiangfuqu_202003_mask_%s.tif"
val bandNumbers = "ndvi".split(",").toSeq
val bandColNames = bandNumbers.map(b ⇒ s"band_$b").toArray
val tileSize = 256
val joinedRF: RasterFrameLayer = bandNumbers
.map { b ⇒ (b, filenamePattern.format(b)) }
.map { case (b, f) ⇒ (b, readTiff(f)) }
.map { case (b, t) ⇒ t.projectedRaster.toLayer(tileSize, tileSize, s"band_$b") }
.reduce(_ spatialJoin _)
.withCRS()
.withExtent()
val tlm = joinedRF.tileLayerMetadata.left.get
//  println(tlm.totalDimensions.cols)
//  println(tlm.totalDimensions.rows)
joinedRF.printSchema()
val targetCol = "label"
val geojsonPath = "/Users/ethan/work/data/L2a10m4326/zds/test.geojson"
spark.sparkContext.addFile(geojsonPath)
import org.locationtech.rasterframes.datasource.geojson._
val jsonDF: DataFrame = spark.read.geojson.load(geojsonPath)
val label_df: DataFrame = jsonDF
.select($"CLASS_ID", st_reproject($"geometry",LatLng,LatLng).alias("geometry"))
.hint("broadcast")
val df_joined = joinedRF.join(label_df, st_intersects(st_geometry($"extent"), $"geometry"))
.withColumn("dims",rf_dimensions($"band_ndvi"))
val df_labeled: DataFrame = df_joined.withColumn(
"label",
rf_rasterize($"geometry", st_geometry($"extent"), $"CLASS_ID", $"dims.cols", $"dims.rows")
)
df_labeled.printSchema()
val tmp = df_labeled.filter(rf_tile_sum($"label") > 0).cache()
val exploder = new TileExploder()
val noDataFilter = new NoDataFilter().setInputCols(bandColNames :+ targetCol)
val assembler = new VectorAssembler()
.setInputCols(bandColNames)
.setOutputCol("features")
val classifier = new DecisionTreeClassifier()
.setLabelCol(targetCol)
.setFeaturesCol(assembler.getOutputCol)
val pipeline = new Pipeline()
.setStages(Array(exploder, noDataFilter, assembler, classifier))
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(targetCol)
.setPredictionCol("prediction")
.setMetricName("f1")
val paramGrid = new ParamGridBuilder()
//.addGrid(classifier.maxDepth, Array(1, 2, 3, 4))
.build()
val trainer = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(4)
val model = trainer.fit(tmp)
val metrics = model.getEstimatorParamMaps
.map(_.toSeq.map(p ⇒ s"${p.param.name} = ${p.value}"))
.map(_.mkString(", "))
.zip(model.avgMetrics)
metrics.toSeq.toDF("params", "metric").show(false)
val scored = model.bestModel.transform(joinedRF)
scored.groupBy($"prediction" as "class").count().show
scored.show(20)

val retiled: DataFrame = scored.groupBy($"crs", $"extent").agg(
rf_assemble_tile(
$"column_index", $"row_index", $"prediction",
tlm.tileCols, tlm.tileRows, IntConstantNoDataCellType
)
)
val rf: RasterFrameLayer = retiled.toLayer(tlm)
val raster: ProjectedRaster[Tile] = rf.toRaster($"prediction", 5848, 4189)
SinglebandGeoTiff(raster.tile,tlm.extent, tlm.crs).write("/Users/ethan/project/IdeaProjects/learn/spark_ml_learn.git/src/main/resources/easy_b1.tif")
val clusterColors = ColorRamp(
ColorRamps.Viridis.toColorMap((0 until 1).toArray).colors
)
//  val pngBytes = retiled.select(rf_render_png($"prediction", clusterColors)).first  //It can output the png.
//  retiled.tile.renderPng(clusterColors).write("/Users/ethan/project/IdeaProjects/learn/spark_ml_learn.git/src/main/resources/classified2.png")
//  Png(pngBytes).write("/Users/ethan/project/IdeaProjects/learn/spark_ml_learn.git/src/main/resources/classified2.png")
spark.stop()
}

我怀疑toLayer扩展方法的工作方式存在错误。我将跟进RasterFrames项目的错误报告。我怀疑这需要付出更多的努力。

下面是一个可能的解决方法,它的级别稍低。在这种情况下,它会导致写出25个不重叠的GeoTiff。

import geotrellis.store.hadoop.{SerializableConfiguration, _}
import geotrellis.spark.Implicits._
import org.apache.hadoop.fs.Path
// Need this to write local files from spark
val hconf = SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
ContextRDD(
rf.toTileLayerRDD($"prediction")
.left.get
.filter{
case (_: SpatialKey, null) ⇒ false  // remove any null Tiles
case _ ⇒ true
},
tlm)
.regrid(1024)  //Regrid the Tiles so that they are 1024 x 1024
.toGeoTiffs()
.foreach{ case (sk: SpatialKey, gt: SinglebandGeoTiff) ⇒
val path = new Path(new Path("file:///tmp/output"), s"${sk.col}_${sk.row}.tif")
gt.write(path, hconf.value)
}

最新更新