我是Spark编程的新手,我正在将多个TSV.gz文件加载到RDD或数据帧中。我想计算加载后列之间的选项卡数量,并根据以下条件将数据行移动到单独的 RDD 或数据帧。
总列数 = 996
If the number of tab counts = 995 -> move to another RDD or DF
If the number of tab counts < 995 -> move to another RDD or DF
If the number of tab counts > 995 -> move to another RDD or DF
我尝试了以下内容,但返回布尔值
val textFile = sc.textFile("/abc/*.tsv.gz")
textFile.map(line=>line.split("t"))
val file1 = textFile.filter(line => line.contains("t").count() > 995)
val file2 = textFile.filter(line => line.contains("t").count() < 995)
如果可以实现相同的目标,请告诉我
谢谢。!
从Spark 2.0开始,强烈建议坚持使用SparkSQL,除非你需要对RDD进行一些低级访问。这不是您的情况,因此请在学习时暂时忘记RDD。
您正在尝试实现的目标可以通过多种方式完成。假设 TSV 具有标题行,或者您可以为该列指定名称。利用 CSV 格式阅读器,只需将t
用作分隔符:
val all = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", "t")
.option("inferSchema", "true")
.load("file.tsv")
接下来假设left
是列 994 的名称,center
是列 995 的名称,right
是列 996 的名称。
val left = all.filter(col("center").isNull)
val center = all.filter(col("center").isNotNull && col("right").isNull)
val right = all.filter(col("right").isNotNull)
如果文件是 csv,请始终使用数据帧。您可以使用 df.columns.length 来给出 csv 文件中的列数。下面是包含 8 列的 csv 文件的示例代码。您可以对 996 列进行相应的修改。
emp1的内容.csv
7369 "SMITH" "CLERK" 7902 "17-Dec-80" 800 20 10
7499 "ALLEN" "SALESMAN" 7698 "20-Feb-81" 1600 300 30
火花代码
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._
object StackOverflow {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().appName("Testing..").master("local[*]").getOrCreate()
import spark.implicits._
val emp1 = spark.read.format("csv").option("delimiter","t").load("in/emp1.csv")
emp1.show(false)
val col_len = emp1.columns.length
if(col_len == 8) {
val df1 = emp1.toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
df1.show(false)
}
if(col_len== 7) {
val df2 = emp1.toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm")
df2.show(false)
}
}
}