我需要根据Spark中的历史数据进行一些计算,但我的情况与遍布互联网的示例略有不同。我有一个包含 3 列的数据集:enter_date、exit_date、client_id。我需要计算每小时间隔之间的在线客户端计数。
例如,考虑以下数据:
enter_date | exit_date | client_id
2017-03-01 12:30:00 | 2017-03-01 13:30:00 | 1
2017-03-01 12:45:00 | 2017-03-01 14:10:00 | 2
2017-03-01 13:00:00 | 2017-03-01 15:20:00 | 3
结果我必须得到以下内容:
time_interval | count
2017-03-01 12:00:00 | 2
2017-03-01 13:00:00 | 3
2017-03-01 14:00:00 | 2
2017-03-01 15:00:00 | 1
如您所见,计算不仅必须基于enter_date,还必须基于enter_date和exit_date列执行。
所以,主要有两个问题:
- 火花能够进行这种类型的计算吗?
- 如果是,如何?
在 Scala 上可以这样实现,猜猜,Python 是类似的:
val clientList = List(
Client("2017-03-01 12:30:00", "2017-03-01 13:30:00", 1),
Client("2017-03-01 12:45:00", "2017-03-01 14:10:00", 2),
Client("2017-03-01 13:00:00", "2017-03-01 15:20:00", 3)
)
val clientDF = sparkContext.parallelize(clientList).toDF
val timeFunctions = new TimeFunctions()
val result = clientDF.flatMap(
// return list of times between "enter_date" and "exit_date"
row => timeFunctions.getDiapason(row.getAs[String]("enter_date"), row.getAs[String]("exit_date"))
).map(time => (time, 1)).reduceByKey(_ + _).sortByKey(ascending = true)
result.foreach(println(_))
结果是:
(2017-03-01 12:00:00,2)
(2017-03-01 13:00:00,3)
(2017-03-01 14:00:00,2)
(2017-03-01 15:00:00,1)
TimeFunctions可以像这样实现:
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
def getDiapason(from: String, to: String): Seq[String] = {
var fromDate = LocalDateTime.parse(from,formatter).withSecond(0).withMinute(0)
val result = ArrayBuffer(formatter.format(fromDate))
val toDate = LocalDateTime.parse(to, formatter).withSecond(0).withMinute(0)
while (toDate.compareTo(fromDate) > 0) {
fromDate = fromDate.plusHours(1)
result += formatter.format(fromDate)
}
result
}
你也可以用Spark SQL来做到这一点,但在那里你必须使用另一个包含间隔的数据集。我使用了一个单独的CSV文件,但理论上您可以根据需要添加它。
我的设置
Apache Spark in Java
- 火花-core_2.10
- 火花-sql_2.10
所需文件:
时间间隔.csv
time_interval
01.03.2017 11:00:00
01.03.2017 12:00:00
01.03.2017 13:00:00
01.03.2017 14:00:00
01.03.2017 15:00:00
01.03.2017 16:00:00
测试.csv
enter_date | exit_date | client_id
2017-03-01 12:30:00 | 2017-03-01 13:30:00 | 1
2017-03-01 12:45:00 | 2017-03-01 14:10:00 | 2
2017-03-01 13:00:00 | 2017-03-01 15:20:00 | 3
我是如何做到的
我在 Java 中完成了此操作,但由于我使用 SQL,因此转换应该非常简单
Dataset<Row> rowsTest = spark.read()
.option("header", "true")
.option("delimiter", ";")
.option("quoteMode", "NONE")
.csv("C:/Temp/stackoverflow/test.csv");
Dataset<Row> rowsTimeInterval = spark.read()
.option("header", "true")
.option("delimiter", ";")
.option("quoteMode", "NONE")
.csv("C:/Temp/stackoverflow/timeinterval.csv");
rowsTest.createOrReplaceTempView("test");
rowsTimeInterval.createOrReplaceTempView("timeinterval");
String sql = "SELECT timeinterval.time_interval,(" +
"SELECT COUNT(test.client_id) FROM timeinterval AS sub" +
" INNER JOIN test ON " +
" ((unix_timestamp(sub.time_interval,"dd.MM.yyyy HH:mm:SS") + 60*60) > unix_timestamp(test.enter_date,"dd.MM.yyyy HH:mm:SS"))" +
" AND" +
" (sub.time_interval < test.exit_date)" +
" WHERE timeinterval.time_interval = sub.time_interval" +
") AS RowCount" +
" FROM timeinterval";
Dataset<Row> result = spark.sql(sql);
result.show();
这里是原始 SQL 语句
SELECT timeinterval.time_interval,(
SELECT COUNT(test.client_id)
FROM timeinterval AS sub
INNER JOIN test ON
(unix_timestamp(sub.time_interval,"dd.MM.yyyy HH:mm:SS") + 60*60) > unix_timestamp(test.enter_date,"dd.MM.yyyy HH:mm:SS"))
AND
(sub.time_interval < test.exit_date)
WHERE
timeinterval.time_interval = sub.time_interval
) AS RowCount
FROM timeinterval
当我使用 unix_timestamp 函数(见 https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#unix_timestamp%28%29(时,您需要一个等于或高于 1.5.0 的版本
结果
| time_interval|RowCount|
+-------------------+--------+
|01.03.2017 11:00:00| 0|
|01.03.2017 12:00:00| 2|
|01.03.2017 13:00:00| 3|
|01.03.2017 14:00:00| 2|
|01.03.2017 15:00:00| 1|
|01.03.2017 16:00:00| 0|
+-------------------+--------+