如何在使用火花流上下文时将 Seq 转换为 RDD



我正在使用TestSuiteBase创建一些带有spark-streaming的测试(使用Spark流上下文scc)。然后我使用 output: Seq[Seq[(Double, Double)]] 创建虚拟数据。最后我想对output应用一些函数,但是这个函数接受RDD[(Double, Double)],而不是Seq[Seq[(Double, Double)]]

为了解决这个问题,我正在考虑使用val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten),但是我应该如何以及在哪里从scc获得火花上下文sc?或者,也许有什么方法可以在不使用Seq的情况下直接在RDD中创建虚拟数据?

class StreamingTestLR  extends SparkFunSuite
                       with TestSuiteBase {
  // use longer wait time to ensure job completion
  override def maxWaitTimeMillis: Int = 20000
  var ssc: StreamingContext = _
  override def afterFunction() {
    super.afterFunction()
    if (ssc != null) {
      ssc.stop()
    }
  }
//...
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
// THE PROBLEM IS HERE!!!
// val metrics = new SomeFuncThatAcceptsRDD(rdd)
}

更新

  // Test if the prediction accuracy of increases when using hyper-parameter optimization
  // in order to learn Y = 10*X1 + 10*X2 on streaming data
  test("Test 1") {
    // create model initialized with zero weights
    val model = new StreamingLinearRegressionWithSGD()
      .setInitialWeights(Vectors.dense(0.0, 0.0))
      .setStepSize(0.2)
      .setNumIterations(25)
    // generate sequence of simulated data for testing
    val numBatches = 10
    val nPoints = 100
    val testInput = (0 until numBatches).map { i =>
      LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1))
    }
    val inputDStream = DStream[LabeledPoint]
    withStreamingContext(setupStreams(testInput, inputDStream)) { ssc =>
      model.trainOn(inputDStream)
      model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
      val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)

      val rdd: RDD[(Double, Double)] = ssc.sparkContext.parallelize(output.flatten)
      // Instantiate metrics object
      val metrics = new RegressionMetrics(rdd)
      // Squared error
      println(s"MSE = ${metrics.meanSquaredError}")
      println(s"RMSE = ${metrics.rootMeanSquaredError}")
      // R-squared
      println(s"R-squared = ${metrics.r2}")
      // Mean absolute error
      println(s"MAE = ${metrics.meanAbsoluteError}")
      // Explained variance
      println(s"Explained variance = ${metrics.explainedVariance}")
    }
  }

试试这个:

 class MyTestSuite extends TestSuiteBase with BeforeAndAfter {
  test("my test") {
    withTestServer(new TestServer()) { testServer =>
      // Start the server
      testServer.start()
      // Set up the streaming context and input streams
      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
        val rdd = ssc.sparkContext.parallelize(output.flatten)
        // your code here 
        testServer.stop()
        ssc.stop()
      }
     }
    }
 }

更多详情请见:https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

最新更新