我想合并 apache spark 数据集中的两列。我尝试了以下内容,但没有奏效,任何人都可以提出解决方案吗?
Dataset<Row> df1 = spark.read().json("src/test/resources/DataSets/DataSet.json");
Dataset<Row> df1Map = df1.select(functions.array("beginTime", "endTime"));
df1Map.show();
输入 DataSet.json 如下所示:
{"IPsrc":"abc", "IPdst":"def", "beginTime": 1, "endTime":1}
{"IPsrc":"def", "IPdst":"abc", "beginTime": 2, "endTime":2}
{"IPsrc":"abc", "IPdst":"def", "beginTime": 3, "endTime":3}
{"IPsrc":"def", "IPdst":"abc", "beginTime": 4, "endTime":4}
请注意,我在 JAVA 8 中执行此操作
上述结果导致错误如下:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'beginTime': was expecting ('true', 'false' or 'null')
at [Source: beginTime; line: 1, column: 19]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20)
at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50)
at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:104)
at org.apache.spark.sql.types.DataType.fromJson(DataType.scala)
df1.show(( 的初始表输出如下:
+-----+-----+---------+-------+
|IPdst|IPsrc|beginTime|endTime|
+-----+-----+---------+-------+
| def| abc| 1| 1|
| abc| def| 2| 2|
| def| abc| 3| 3|
| abc| def| 4| 4|
+-----+-----+---------+-------+
df1 的架构如下:
root
|-- IPdst: string (nullable = true)
|-- IPsrc: string (nullable = true)
|-- beginTime: long (nullable = true)
|-- endTime: long (nullable = true)
如果你想
连接两列,你可以这样做:
Dataset<Row> df1Map = df1.select(functions.concat(df1.col("beginTime"), df1.col("endTime")));
df1Map.show();
编辑:我尝试了这个,它有效:这是我的代码:
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("SO")
.getOrCreate();
Dataset<Row> df1 = spark.read().json("src/main/resources/json/Dataset.json");
df1.printSchema();
df1.show();
Dataset<Row> df1Map = df1.select(functions.array("beginTime", "endTime"));
df1Map.show();
你可以
试试这个
Dataset<Row> df1Map = df1.withColumn("begin_end_time", concat(col("beginTime"), lit(" - "), (col("endTime")) );