>我正在尝试通过Spark 2.0中的函数访问HashMap,但是如果我并行化列表,它会失败。如果我不这样做,它就可以工作,如果我不使用案例类,它就会起作用。
以下是我正在尝试执行的操作的一些示例代码:
case class TestData(val s: String)
def testKey(testData: TestData) {
println(f"Current Map: $myMap")
println(f"Key sent into function: $testData")
println("Key isn't found in Map:")
println(myMap(testData)) // fails here
}
val myList = sc.parallelize(List(TestData("foo")))
val myMap = Map(TestData("foo") -> "bar")
myList.collect.foreach(testKey) // collect to see println
以下是确切的输出:
Current Map: Map(TestData(foo) -> bar)
Key sent into function: TestData(foo)
Key isn't found in Map:
java.util.NoSuchElementException: key not found: TestData(foo)
上面的代码类似于我正在尝试做的事情,除了 case 类更复杂并且 HashMap 将列表作为值。同样在上面的示例中,我使用"collect",以便输出打印语句。样本在没有收集的情况下仍然给出相同的错误,但没有打印。
哈希代码已经匹配,但我尝试覆盖大小写类的等于和哈希代码,同样的问题。
这是使用Databricks,所以我相信我无法访问REPL或Spark-submit。
感谢评论指出类似的问题,该问题涉及 Spark 问题,这让我为我的情况找到了这个解决方案:
case class TestData(val s: String) {
override def equals(obj: Any) = obj.isInstanceOf[TestData] && obj.asInstanceOf[TestData].s == this.s
}
覆盖等于以包含 isInstanceOf 可解决此问题。这可能不是最好的解决方案,但它绝对是最简单的解决方法。
你的逻辑是循环的和错误的。您正在将相同的RDD传递给Map并使用TestData调用。更新它以使其按顺序排列,如下所示:
case class TestData(val s: String)
def testKey(testData: TestData) {
val myMap = Map(testData -> "bar")
println(f"Current Map: $myMap")
println(f"Key sent into function: $testData")
println("Key isn't found in Map:")
println(myMap(testData)) // fails here
}
val myList = sc.parallelize(List(TestData("foo")))
myList.collect.foreach(testKey)
它的输出是:
Current Map: Map(TestData(foo) -> bar)
Key sent into function: TestData(foo)
Key isn't found in Map:
bar
我希望这就是你所期待的...