我有一个相对较大的数据集,该数据集的作品记录被提交给队列。[一部分]这些记录已经开始和结束了作业运行时的开始和结尾,以及他们消耗的资源(在这里的NNODES中,有多少)。
我能做的是为每天的每一天创建一个新的行。我想做的是还要在那几天使用这些资源的那一天每隔几个小时。
+-------+-------------------+-------------------+------+----------+
| job_id| start| end|nnodes| job's day|
+-------+-------------------+-------------------+------+----------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18| 1676|2015-01-04|
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18| 1676|2015-01-05|
+-------+-------------------+-------------------+------+----------+
这似乎应该是一项简单的任务,但是我需要做的是获取每个工作的天数。如果一个答案是简单的减少,但是对于任何较大的数字而言,并不是那么简单。我想知道是否有一个典型的解决方案。正如应该假设的那样,并非所有月份都相等。
P.S。我想强调的是,我需要每个记录的数小时或几分钟。因此,第一行应该是:
+-------+-------------------+-------------------+------+----------+--------+
| job_id| start| end|nnodes| job's day| minutes|
+-------+-------------------+-------------------+------+----------+--------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18| 1676|2015-01-04| 46|
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18| 1676|2015-01-05| 13|
+-------+-------------------+-------------------+------+----------+--------+
等等。
这仅通过将日期和时间转换为 unix_timestamp
并从另一个中取一个来完成这很容易。这将使您在几分钟之内将其差异化,只需将60除以6600。
ive添加了我认为您需要的逻辑,以得出每天的时间使用时间。
val df = Seq(
(2747673, "2015-01-04 23:44:03", "2015-01-05 00:00:18", 1676, "2015-01-04"),
(2747673, "2015-01-04 23:44:03", "2015-01-05 00:00:18", 1676, "2015-01-05")
).toDF("job_id", "start", "end", "nnodes", "job's day")
+-------+-------------------+-------------------+------+----------+
| job_id| start| end|nnodes| job's day|
+-------+-------------------+-------------------+------+----------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18| 1676|2015-01-04|
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18| 1676|2015-01-05|
+-------+-------------------+-------------------+------+----------+
val timeUsage = when(unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss") < unix_timestamp(col("job's day"), "yyyy-MM-dd") &&
unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") > unix_timestamp(date_add(col("job's day"), 1), "yyyy-MM-dd"), lit(86,400))
.when(unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss") < unix_timestamp(col("job's day"), "yyyy-MM-dd"),
unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") - unix_timestamp(col("job's day"), "yyyy-MM-dd"))
.when(unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") > unix_timestamp(date_add(col("job's day"), 1), "yyyy-MM-dd"),
unix_timestamp(date_add(col("job's day"), 1), "yyyy-MM-dd") - unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss"))
.otherwise(unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") - unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss"))
df.withColumn("difference_in_seconds", timeUsage).show
+-------+-------------------+-------------------+------+----------+---------------------+
| job_id| start| end|nnodes| job's day|difference_in_seconds|
+-------+-------------------+-------------------+------+----------+---------------------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18| 1676|2015-01-04| 957|
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18| 1676|2015-01-05| 18|
+-------+-------------------+-------------------+------+----------+---------------------+
df.withColumn("difference_in_minutes", timeUsage/60).show
+-------+-------------------+-------------------+------+----------+---------------------+
| job_id| start| end|nnodes| job's day|difference_in_minutes|
+-------+-------------------+-------------------+------+----------+---------------------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18| 1676|2015-01-04| 15.95|
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18| 1676|2015-01-05| 0.3|
+-------+-------------------+-------------------+------+----------+---------------------+