Spark流媒体24X7,带有updateStateByKey问题



我正在运行一个全天候的spark流,并使用updateStateByKey是否可以全天候运行火花流?如果是,updateStateByKey不会变大,如何处理?当我们全天候运行时,我们是否必须定期重置/删除updateStateByKey?如果不是,如何以及何时重置?还是Spark以分布式方式处理?如何动态增加内存/存储。

当updateStateByKey增长时,我得到以下错误

Array out of bound exception
Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4

如何处理。。请给我指一下有没有文件?我完全陷入困境,任何帮助都非常感激。。感谢您抽出时间

在Java中使用Optional.abst(),在Scala中使用None来移除键。工作示例可在http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/.

用None更新密钥将从spark中删除它。如果您想将密钥保留一段时间,可以在密钥上附加一个过期时间,并每批检查一次。

例如,以下是按分钟计数记录的代码:

val counts = lines.map(line => (currentMinute, 1))
val countsWithState = counts updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
  if (values.isEmpty) { // every key will be iterated, even if there's no record in this batch
    println("values is empty")
    None // this will remove the key from spark
  } else {
    println("values size " + values.size)
    Some(state.sum + values.sum)
  }
}

pyspark:updateStateByKey(self,updateFunc,numPartitions=None,initialRDD=None)

返回一个新的"state"DStream,其中每个键的状态通过应用键的先前状态的给定函数和键的新值

@param updateFunc:状态更新函数。如果此函数返回None,则相应的状态键值对将被消除

updateFunc方法return None,状态键值对remove;

相关内容

  • 没有找到相关文章

最新更新