我有两个文本文件,我正在创建数据框架。两个文件都有相同的列,除了一个列。
当我板条架模式并加入两个时,我会遇到错误,例如
java.lang.arayindexoutofboundsexception
基本上我的模式具有列,我的一个文本文件只有5列。
没有如何将一些无效的值附加到已经创建的架构,然后加入?
这是我的代码
val schema = StructType(Array(
StructField("TimeStamp", StringType),
StructField("Id", StringType),
StructField("Name", StringType),
StructField("Val", StringType),
StructField("Age", StringType),
StructField("Dept", StringType)))
val textRdd1 = sc.textFile("s3://test/Text1.txt")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split(",", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
val textRdd2 = sc.textFile("s3://test/Text2.txt")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split(",", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
val df3 = df1.join(df2)
时间戳列中不存在第一个文本文件...
为什么不从架构中排除第一个dataframe的架构?
val df1 = sqlContext.createDataFrame(rowRdd1, new StructType(schema.tail.toArray))
如评论中所述,您不需要模式是相似的。您还可以指定您加入条件,然后选择以加入的列。
将时间戳列添加到第一个数据框架
import spark.sql.functions._
import org.apache.spark.sql.types.DataType
val df1Final = df1.withColumn("TimeStamp", lit(null).cast(Long))
然后继续加入
您可以在没有此字段的情况下创建一个新的架构,并使用此架构。德米特里(Dmitri)建议的是使用原始架构并删除您不需要保存的字段来编写第二个模式定义。
将两个文件加载到数据集中后,您可以执行公共字段中的联接基础并删除重复列,我想这就是您想要的,这样做:
df3 = df1.join(df2, (df1("Id") === df2("Id")) && (df1("Name") === df2("Name")) && (df1("Val") === df2("Val")) && (df1("Age") === df2("Age")) && (df1("Dept") === df2("Dept")))
.drop(df2("Id"))
.drop(df2("Name"))
.drop(df2("Val"))
.drop(df2("Age"))
.drop(df2("Dept"))