我们拥有的常见火花处理流是这样的:
加载:
rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
通过id
处理(这就是为什么我们在上面执行partitionBy
!!)
rdd.reduceByKey(....)
rdd.join(...)
但是,SPARK 1.3更改了sqlContext.parquetFile
以返回DataFrame
而不是RDD
,并且它不再具有partitionBy
,getNumPartitions
和reduceByKey
方法。
我们现在用partitionBy
做什么?
我们可以用
之类的替换加载代码rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)
并使用groupBy
代替reduceByKey
。
这是正确的方式吗?
ps。是的,我知道partitionBy
是groupBy
等人所需的不是。但是,如果没有先前的partitionBy
,每个 join
,groupBy
& c可能必须进行跨节点操作。我正在寻找一种方法保证,所有需要按密钥分组的操作都将运行 local 。
看来,由于版本 1.6 ,repartition(self, numPartitions, *cols)
做我需要的事情:
.. versionchanged:: 1.6
添加了可选参数以指定分区列。 如果指定了分区列,也可以将
numPartitions
可选。
因为DataFrame
为我们提供了 table 和 linter RDD
的抽象,这是操纵DataFrame
的最方便方法是使用这些抽象与特定的表操作方法 dataframe可以使我们。
在数据框架上,我们可以:
- 使用
select()
udf()
as()
转换表模式 -
filter()
或where()
的过滤器排 - 通过
groupBy()
和agg()
射击聚合 - 或使用
sample()
join()
union()
的其他分析工作 - 使用
saveAsTable()
saveAsParquet()
insertIntoJDBC()
坚持您的结果
请参阅Spark SQL和DataFrame指南以获取更多详细信息。
因此,一个常见的工作看起来像:
val people = sqlContext.parquetFile("...")
val department = sqlContext.parquetFile("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
,对于您的特定要求,这可能看起来像:
val t = sqlContext.parquetFile()
t.filter().select().groupBy().agg()