计算 Spark RDD 或数据帧中列之间的分隔符,并将行移动到单独的 RDD 或数据帧



我是Spark编程的新手,我正在将多个TSV.gz文件加载到RDD或数据帧中。我想计算加载后列之间的选项卡数量,并根据以下条件将数据行移动到单独的 RDD 或数据帧。

总列数 = 996

If the number of tab counts = 995 -> move to another RDD or DF
If the number of tab counts < 995 -> move to another RDD or DF
If the number of tab counts > 995 -> move to another RDD or DF

我尝试了以下内容,但返回布尔值

val textFile = sc.textFile("/abc/*.tsv.gz")
textFile.map(line=>line.split("t"))
val file1 = textFile.filter(line => line.contains("t").count() > 995)
val file2 = textFile.filter(line => line.contains("t").count() < 995)

如果可以实现相同的目标,请告诉我

谢谢。!

首先,

从Spark 2.0开始,强烈建议坚持使用SparkSQL,除非你需要对RDD进行一些低级访问。这不是您的情况,因此请在学习时暂时忘记RDD。

您正在尝试实现的目标可以通过多种方式完成。假设 TSV 具有标题行,或者您可以为该列指定名称。利用 CSV 格式阅读器,只需将t用作分隔符:

val all = spark.read
  .format("csv")
  .option("header", "true")
  .option("delimiter", "t")
  .option("inferSchema", "true")
  .load("file.tsv")

接下来假设left是列 994 的名称,center是列 995 的名称,right是列 996 的名称。

val left = all.filter(col("center").isNull)
val center = all.filter(col("center").isNotNull && col("right").isNull)
val right = all.filter(col("right").isNotNull)

如果文件是 csv,请始终使用数据帧。您可以使用 df.columns.length 来给出 csv 文件中的列数。下面是包含 8 列的 csv 文件的示例代码。您可以对 996 列进行相应的修改。

emp1的内容.csv

7369     "SMITH"     "CLERK"     7902    "17-Dec-80"     800     20  10
7499     "ALLEN"     "SALESMAN"  7698    "20-Feb-81"     1600    300     30

火花代码

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._
object StackOverflow {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder().appName("Testing..").master("local[*]").getOrCreate()
    import spark.implicits._
    val emp1 = spark.read.format("csv").option("delimiter","t").load("in/emp1.csv")
    emp1.show(false)
    val col_len = emp1.columns.length
    if(col_len == 8) {
      val df1 = emp1.toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
      df1.show(false)
    }
    if(col_len== 7) {
      val df2 = emp1.toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm")
      df2.show(false)
    }
  }
}

相关内容

  • 没有找到相关文章

最新更新