用火花检查日志



我是新手Spark,我正在尝试开发一个带有一些日志的CSV文件的Python脚本:

userId,timestamp,ip,event
13,2016-12-29 16:53:44,86.20.90.121,login
43,2016-12-29 16:53:44,106.9.38.79,login
66,2016-12-29 16:53:44,204.102.78.108,logoff
101,2016-12-29 16:53:44,14.139.102.226,login
91,2016-12-29 16:53:44,23.195.2.174,logoff

并检查用户是否有一些奇怪的行为,例如,他是否连续进行了两个"登录"而不进行"登录"。我已经加载了CSV作为火花数据框架,我想比较单个用户的日志行,并通过时间戳订购,并检查两个连续的事件是否为相同的类型(login -login,login,logoff -logoff)。我正在以"地图"的方式寻找这样做,但是目前,我尚不知道如何使用比较连续行的降低功能。我编写的代码作品,但性能非常糟糕。

sc = SparkContext("local","Data Check")
sqlContext = SQLContext(sc)
LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"
N_USERS = 10*1000
dataFrame = sqlContext.read.format("com.databricks.spark.csv").load(LOG_FILE_PATH)
dataFrame = dataFrame.selectExpr("C0 as userID","C1 as timestamp","C2 as ip","C3 as event")
wrongUsers = []
for i in range(0,N_USERS):
    userDataFrame = dataFrame.where(dataFrame['userId'] == i)
    userDataFrame = userDataFrame.sort('timestamp')
    prevEvent = ''
    for row in userDataFrame.rdd.collect():
        currEvent = row[3]
        if(prevEvent == currEvent):
            wrongUsers.append(row[0])
        prevEvent = currEvent
badUsers = sqlContext.createDataFrame(wrongUsers)
badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)

首先(无关但仍然相关),请确保每个用户的条目数量不大,因为for row in userDataFrame.rdd.collect():中的collect很危险。

第二,您无需离开DataFrame区域即可使用经典的Python,只需坚持火花。

现在,您的问题。基本上,这是"我想从上一行中知道的东西":属于Window函数的概念,并且确切地将lag函数确切化。这是有关Spark中的窗口函数的两篇有趣的文章:一篇来自Python中的Databricks,一篇来自Python中的代码,一篇来自Scala中的示例(我认为更容易理解)。

我在Scala中有一个解决方案,但我认为您将其从Python中翻译出来:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag
import sqlContext.implicits._
val LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
val RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"
val data = sqlContext
  .read
  .format("com.databricks.spark.csv")
  .option("inferSchema", "true")
  .option("header", "true") // use the header from your csv
  .load(LOG_FILE_PATH)
val wSpec = Window.partitionBy("userId").orderBy("timestamp")
val badUsers = data
  .withColumn("previousEvent", lag($"event", 1).over(wSpec))
  .filter($"previousEvent" === $"event")
  .select("userId")
  .distinct
badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)

基本上,您只需从上一行中检索值,并将其与当前行上的值进行比较,如果它是错误的行为,并且保留userId。对于每个userId的"线路"中的第一行,上一个值将为null:与当前值进行比较时,布尔表达式将为false,因此这里没有问题。

相关内容

  • 没有找到相关文章

最新更新