我想将数据集中具有类似2018-08-17T19:58:46.000+0000
的值的时间戳字段映射为类似2018-08-17 19:58:46.000
的格式,即yyyy-MM-dd HH:MM:ss.SSS,并将一些列映射为yyyy-MM-dd。
例如,我有一个数据集DS1,其中列id,lastModif,创建:
+------------------+----------------------------+----------------------------+
|Id |lastModif |created |
+------------------+----------------------------+----------------------------+
|abc1 |2019-01-14T19:51:55.000+0000|2019-02-07T20:37:53.000+0000|
|AQA2 |2019-02-05T19:26:36.000+0000|2019-02-07T20:40:06.000+0000|
+------------------+----------------------------+----------------------------+
从上面的DS1,我需要映射到格式yyyy-MM-dd HH:mm:ss.SSS
的lastModif
列和映射到yyyy-MM-dd
的createdTime
列
我有类似的DS2、DS3,但有不同的列映射
我保存了一个属性文件,它将从中获取映射列作为键,并获取时间戳格式作为值
在代码中,我保留映射列和非映射列的列表,并选择列:
String cols = "Id,created,lastModif";
String[] colArr = cols.split(",");
String mappedCols = "lastModif,created"; //hardcoding as of now.
List<String> mappedColList = Arrays.asList(mappedCols.split(","));
String nonMappedCols = getNonMappingCols(colArr, mappedCols.split(",")).toLowerCase();
List<String> nonMapped = Arrays.asList(nonMappedCols.split(","));
//column-mapping logic
filtered = tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif"));
filtered.show(false);
public static Seq<String> convertListToSeq(List<String> inputList)
{
return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}
private static String getNonMappingCols(String[] cols, String[] mapped)
{
String nonMappedCols = "";
List<String> mappedList = Arrays.asList(mapped);
for(int i=0; i<cols.length; i++)
{
if(!mappedList.contains(cols[i])){
nonMappedCols += cols[i]+",";
}
}
nonMappedCols = nonMappedCols.substring(0, nonMappedCols.length()-1);
return nonMappedCols;
}
如何将列映射到所需的时间戳格式
在代码tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif"));
的行中,$"lastModif"
在Java中是未识别的
其次,这种方式是一种静态方式,即对映射列进行硬编码。如何从List<String> mappedColList
映射列?
-
首先,让我们澄清您的输入数据。您提到,您有时间戳,但您列出的输出格式似乎只是String值,表示以下格式yyyy-MM-dd'TT'HH:MM:ss.SSSZ。您能证实这一结论吗?
-
在您的一条评论中,您回答说函数
unix_timestamp
在您的尝试中返回null
。查看unix_timestamp(列s,字符串p)的文档,我们可以看到它需要其他格式来解析,否则它将返回null
:
参数:
s-日期、时间戳或字符串。如果是字符串,则数据的格式必须可以转换为时间戳,例如yyyy-MM-dd或yyyy-MM-dd HH:MM:ss。SSSS
fmt-当s是字符串时详细说明s格式的日期-时间模式
返回:
时间戳,如果s是无法转换为时间戳记的字符串或fmt是无效格式,则返回null
- 如果您输入的参数真的是String,我建议您使用以下解决方案,使用Spark SQL函数to_timestamp(列s,字符串fmt)和date_format(列dateExpr,字符串格式)
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.to_timestamp;
import static org.apache.spark.sql.functions.date_format;
....
SparkSession spark = SparkSession
.builder()
.appName("datetime-transformation")
.master("local[*]")
.getOrCreate();
SomeDto someDto = SomeDto.builder()
.id("abc1")
.lastModif("2019-01-14T19:51:55.123+02:00")
.created("2019-01-14T19:51:55.123+02:00")
.build();
Dataset<Row> ds = spark.createDataset(Collections.singletonList(someDto), Encoders.bean(SomeDto.class)).toDF();
ds.printSchema();
ds.show(false);
Dataset<Row> dfm = ds
.withColumn("lastModif", to_timestamp(col("lastModif"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
.withColumn("created", date_format(to_timestamp(col("lastModif"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), "yyyy-MM-dd"));
dfm.printSchema();
dfm.show(false);
输出为:
root
|-- created: string (nullable = true)
|-- id: string (nullable = true)
|-- lastModif: string (nullable = true)
+-----------------------------+----+-----------------------------+
|created |id |lastModif |
+-----------------------------+----+-----------------------------+
|2019-01-14T19:51:55.123+02:00|abc1|2019-01-14T19:51:55.123+02:00|
+-----------------------------+----+-----------------------------+
root
|-- created: string (nullable = true)
|-- id: string (nullable = true)
|-- lastModif: timestamp (nullable = true)
+----------+----+-----------------------+
|created |id |lastModif |
+----------+----+-----------------------+
|2019-01-14|abc1|2019-01-14 19:51:55.123|
+----------+----+-----------------------+
这就是我如何使映射动态化的:
private static Dataset<Row> mapColumns(Properties mappings, String tableNm, String[] colArr, Dataset<Row> tempDS) throws Exception
{
String mappedCols = "lastmodif,createdDate,endDate";
Dataset<Row> filtered = null;
Properties mappingCols = mappings;
List<String> mapped = Arrays.asList(mappedCols.split(","));
List<String> colsList = Arrays.asList(colArr);
ArrayList<String> tempList = new ArrayList<String>();
Iterator itrTmp = colsList.iterator();
while(itrTmp.hasNext()){
tempList.add((String)itrTmp.next());
}
Iterator itr = mapped.iterator();
filtered = tempDS.selectExpr(convertListToSeq(colsList));
while(itr.hasNext()){
String column = itr.next().toString();
String newCol = column+"_mapped";
String propertyKey = tableNm+"-"+column;
String propertyValue = mappingCols.getProperty(propertyKey);
filtered = filtered.selectExpr(convertListToSeq(colsList))
.withColumn(newCol, functions.regexp_replace(functions.substring(filtered.col(column), 0, 23),"T", " ")).alias(newCol)
.drop(filtered.col(column));
tempList.remove(column);
tempList.add(newCol);
colsList = tempList;
}
filtered = filtered.selectExpr(convertListToSeq(colsList));
filtered.show(false);
}
public static Seq<String> convertListToSeq(List<String> inputList)
{
return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}
但String
到Timestamp
的转换仍然悬而未决。到目前为止,我正在做substring
,但这一逻辑适用于数据类型为yyyy-mm-ddThh:mm:ss.SSSZ
或yyyy-mm-ddThh:mm:ss.SSS+0000
等的所有列,但如果一列的数据类型为yyyy-mm-dd
,则该逻辑将不起作用,并且代码将中断。我在这里提出了这个问题:如何将字符串转换为时间戳。