Spark DataTables:在哪里分配



我们拥有的常见火花处理流是这样的:

加载:

rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())

通过id处理(这就是为什么我们在上面执行partitionBy !!)

rdd.reduceByKey(....)
rdd.join(...)

但是,SPARK 1.3更改了sqlContext.parquetFile以返回DataFrame而不是RDD,并且它不再具有partitionBygetNumPartitionsreduceByKey方法。

我们现在用partitionBy做什么?

我们可以用

之类的替换加载代码
rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)

并使用groupBy代替reduceByKey

这是正确的方式吗?

ps。是的,我知道partitionBygroupBy等人所需的不是。但是,如果没有先前的partitionBy每个 joingroupBy& c可能必须进行跨节点操作。我正在寻找一种方法保证,所有需要按密钥分组的操作都将运行 local

看来,由于版本 1.6 repartition(self, numPartitions, *cols)做我需要的事情:

.. versionchanged:: 1.6

添加了可选参数以指定分区列。 如果指定了分区列,也可以将numPartitions可选。

因为DataFrame为我们提供了 table linter RDD的抽象,这是操纵DataFrame的最方便方法是使用这些抽象与特定的表操作方法 dataframe可以使我们。

在数据框架上,我们可以:

  • 使用select() udf() as()
  • 转换表模式
  • filter()where()
  • 的过滤器排
  • 通过groupBy()agg()
  • 射击聚合
  • 或使用sample() join() union()
  • 的其他分析工作
  • 使用saveAsTable() saveAsParquet() insertIntoJDBC()
  • 坚持您的结果

请参阅Spark SQL和DataFrame指南以获取更多详细信息。

因此,一个常见的工作看起来像:

val people = sqlContext.parquetFile("...")
val department = sqlContext.parquetFile("...")
people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))

,对于您的特定要求,这可能看起来像:

val t = sqlContext.parquetFile()
t.filter().select().groupBy().agg()

相关内容

  • 没有找到相关文章

最新更新