我使用此SQL为数据集创建Session_ID。如果用户不活动超过30分钟(30*60秒),则分配了一个新的Session_ID,我是新手SPARK SQL,并尝试使用SPARK SQL上下文来复制相同的过程。但是我遇到了一些错误。
session_id遵循命名约定:
USERID_1,
USERID_2,
UserId_3,...
sql(日期为秒):
CREATE TABLE tablename_with_session_id AS
SELECT * , userid || '_' || SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS session_id
FROM
(SELECT *,
CASE
WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60)
THEN 1
WHEN row_number() over (partition by userid order by date) = 1
THEN 1
ELSE 0
END as new_session
FROM
tablename
)
order by date;
我尝试使用Spark-Scala中的同一SQL使用:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val tableSessionID = sqlContext.sql("SELECT * , CONCAT(userid,'_',SUM(new_session)) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM
(SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream) order by date")
一些建议包装Spark SQL表达式的错误。Sum(new_session)..在窗口功能中
我尝试使用多个数据帧:
val temp1 = sqlContext.sql("SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream")
temp1.registerTempTable("clickstream_temp1")
val temp2 = sqlContext.sql("SELECT * , SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS s_id FROM clickstream_temp1")
temp2.registerTempTable("clickstream_temp2")
val temp3 = sqlContext.sql("SELECT * , CONCAT(userid,'_',s_id) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM clickstream_temp2")
它仅在上述语句上返回错误。'val temp3 = ...'该concat(用户ID,'_',s_id)不能在窗口函数中使用。
什么是解决方法?有其他选择吗?
谢谢
要与Spark Window功能一起使用Concat,您需要使用用户定义的聚合功能(UDAF)。您不能将Concat函数与窗口函数直接使用。
//Extend UserDefinedAggregateFunction to write custom aggregate function
//You can also specify any constructor arguments. For instance you can have
//CustomConcat(arg1: Int, arg2: String)
class CustomConcat() extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.Row
// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("description", StringType)))
// Intermediate Schema
def bufferSchema = StructType(Array(StructField("groupConcat", StringType)))
// Returned Data Type.
def dataType: DataType = StringType
// Self-explaining
def deterministic = true
// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {buffer(0) = " ".toString}
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = { buffer(0) = buffer.getString(0) + input.getString(0) }
// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1(0) = buffer1.getString(0) + buffer2.getString(0) }
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {buffer.getString(0)}
}
val newdescription = new CustomConcat
val newdesc1=newdescription($"description").over(windowspec)
您可以将newDesc1用作窗口函数中串联的聚合函数。有关更多信息,您可以查看:Databricks UDAF我希望这会回答您的问题。