我是sparksql/scala的新手,我正在努力处理一些看似简单的任务。
我正在尝试从Scala字符串数组构建一些动态SQL。我正在尝试在数据框架中重新类型一些列,但是在运行时我不知道需要重新输入哪个列,直到我可以在DataFrame中看到一组列。所以我想这样做:
val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...) or df.selectExpr(...) // how to invoke this with vals from my string array??
TypedCols最终将成为具有这样的值的字符串:
["a", "cast(b as int) b", "c"]
我需要先创建一个大逗号界的字符串吗?
因此,假设这将起作用,我会调用该精选语句,它将我的数据框架转换为带有所需类型的新数据框架。但是数据框中的某些记录将有错误,并且将失败尝试重新打入。
如何获得所有通过打字的好记录,然后将所有不良记录放在某种错误桶中?在尝试dataFrame选择?
您可以使用variadic参数:
val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show
+---+----+---+
| a| b| c|
+---+----+---+
| a| 1| c|
|foo|null|baz|
+---+----+---+
但我个人更喜欢列:
val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show
如何获得所有通过打字的好记录,然后将所有不良记录放在某种错误桶中?
未能cast
的数据是NULL
。要查找良好的记录,请使用na.drop
:
val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()
查找不良检查是否是NULL
import org.apache.spark.sql.functions.col
val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))
获取无与伦比的数据:
如果
typedCols
是Seq[Column]
,您可以df.where(typedCols.map(_.isNull).reduce(_ || _))
如果
typedCols
是Seq[String]
,则可以:import org.apache.spark.sql.functions.expr df.where(typedCols.map(expr(_).isNull).reduce(_ || _))
答案很好,但缺少另一个简单的解决方案:
val columns = Array("name", "age")
val df2 = df.select(columns.map(col): _*)