优化 Scala 代码以读取内存中不适合的大文件的有效方法



下面的问题陈述,

我们有一个大型日志文件,用于存储用户与应用程序的交互。日志文件中的条目遵循以下架构:{userId, timestamp, actionType},其中 actionType 是两个可能的值之一:[打开、关闭]

约束:

  1. 日志文件太大,无法放入一台计算机上的内存中。还假设聚合的数据不适合内存。
  2. 代码必须能够在一台机器上运行。
  3. 不应该使用mapreduce或第三方数据库的开箱即用实现;不要假设我们有Hadoop或Spark或其他分布式计算框架。
  4. 每个用户的每个 actionType 可以有多个条目,并且日志文件中可能缺少条目。因此,用户可能会在两个打开的记录之间缺少关闭记录,反之亦然。
  5. 时间戳将严格按升序排列。

对于这个问题,我们需要实现一个类/类来计算每个用户在打开和关闭之间花费的平均时间。请记住,某些用户缺少条目,因此在进行计算时,我们必须选择如何处理这些条目。代码应该遵循关于我们如何做出选择的一致策略。

对于日志文件中的所有用户,解决方案的所需输出应为 [{userId, timeSpent},....]。

示例日志文件(逗号分隔的文本文件)

1,1435456566,open 
2,1435457643,open 
3,1435458912,open 
1,1435459567,close 
4,1435460345,open 
1,1435461234,open 
2,1435462567,close 
1,1435463456,open 
3,1435464398,close 
4,1435465122,close 
1,1435466775,close

方法

下面是我用Python和Scala编写的代码,它似乎效率不高,并且达到了给定场景的期望,我想在这个论坛的开发人员社区中反馈我们如何更好地根据给定的场景优化这段代码。

斯卡拉实现

import java.io.FileInputStream
import java.util.{Scanner, Map, LinkedList}
import java.lang.Long
import scala.collection.mutable
object UserMetrics extends App {
if (args.length == 0) {
println("Please provide input data file name for processing")
} 
val userMetrics = new UserMetrics()
userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)
}
case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)
class UserMetrics {
val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()
def readInputFile(stArr:String, timeOut: Int) {
var inputStream: FileInputStream = null
var sc: Scanner = null
try {
inputStream = new FileInputStream(stArr);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine()) {
val line: String = sc.nextLine();
processInput(line, timeOut)
}

for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap) {
val userInfo:UserInfo = userLs.get(0)
val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
println("{" + key +","+timespent + "}")
}
if (sc.ioException() != null) {
throw sc.ioException();
}
} finally {
if (inputStream != null) {
inputStream.close();
}
if (sc != null) {
sc.close();
}
}
}
def processInput(line: String, timeOut: Int) {
val strSp = line.split(",")
val userId: Integer = Integer.parseInt(strSp(0))
val curTimeStamp = Long.parseLong(strSp(1))
val status = strSp(2)
val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()
val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)
if (lsUserInfo != null && lsUserInfo.size() > 0) {
val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
val prevStatus: String = lastUserInfo.prevStatus

if (prevStatus.equals("open")) {
if (status.equals(lastUserInfo.prevStatus)) {
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
val timeDiff = lastUserInfo.timeSpent + timeSelector
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
} else if(!status.equals(lastUserInfo.prevStatus)){
val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
}
} else if(prevStatus.equals("close")) {
if (status.equals(lastUserInfo.prevStatus)) {
lsUserInfo.remove()
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
}else if(!status.equals(lastUserInfo.prevStatus))
{     
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))
}
}
}else if(lsUserInfo.size()==0){
lsUserInfo.add(uInfo)
}
usermap.put(userId, lsUserInfo)
}
}

蟒蛇实现

import sys
def fileBlockStream(fp, number_of_blocks, block):
#A generator that splits a file into blocks and iterates over the lines of one of the blocks.

assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
assert 0 < number_of_blocks

fp.seek(0,2) #seek to end of file to compute block size
file_size = fp.tell() 

ini = file_size * block / number_of_blocks #compute start & end point of file block
end = file_size * (1 + block) / number_of_blocks

if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()

while fp.tell() < end:
yield fp.readline() #iterate over lines of the particular chunk or block
def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
for rows in chunk.splitlines():
if len(rows.split(",")) != 3:
continue
userKeyID = rows.split(",")[0]
try:
curTimeStamp = int(rows.split(",")[1])
except ValueError:
print("Invalid Timestamp for ID:" + str(userKeyID))
continue
curEvent = rows.split(",")[2]
if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close": 
#Check if already existing userID with expected Close event 0 - Open; 1 - Close
#Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
avgTimeSpentDict[userKeyID][totTmPos] = totalTime
avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount          
elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close": 
curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][openTmPos]=openTime
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
elif curEvent == "open":
#Initialize userid with Open event
avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]

elif curEvent == "close":
#Initialize userid with missing handler function since there is no Open event for this User
totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]
def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
if lastTimeVal - curTimeVal > defaultTimeOut:
return defaultTimeOut,curTimeVal
else:
return lastTimeVal - curTimeVal,curTimeVal
def computeAvg(avgTimeSpentDict,defaultTimeOut):
resDict = {}
for k,v in avgTimeSpentDict.iteritems():
if v[0] == 0:
resDict[k] = 0
else:
resDict[k] = v[1]/v[0]
return resDict
if __name__ == "__main__":
avgTimeSpentDict = {}
if len(sys.argv) < 2:
print("Please provide input data file name for processing")
sys.exit(1)

fileObj = open(sys.argv[1])
number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
for chunk_number in range(number_of_chunks):
for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
print (computeAvg(avgTimeSpentDict,defaultTimeOut))
avgTimeSpentDict.clear() #Nullify dictionary 
fileObj.close #Close the file object

上面的两个程序都提供了所需的输出,但对于这种特定场景,效率才是最重要的。如果您对现有实施有任何更好的建议或任何建议,请告诉我。

提前感谢!!

你追求的是迭代器的使用。我不会重写您的代码,但这里的诀窍可能是使用iterator。幸运的是,Scala为这项工作提供了不错的开箱即用工具。

import scala.io.Source
object ReadBigFiles {
def read(fileName: String): Unit = {
val lines: Iterator[String] = Source.fromFile(fileName).getLines
// now you get iterator semantics for the file line traversal
// that means you can only go through the lines once, but you don't incur a penalty on heap usage
}
}

对于您的用例,您似乎需要一个lastUser,因此您正在处理 2 个条目的组。我认为您有两种选择,要么选择iterator.sliding(2),这将为每对生成迭代器,要么只是使用选项将递归添加到组合中。

def navigate(source: Iterator[String], last: Option[User]): ResultType = {
if (source.hasNext) {
val current = source.next()
last match {
case Some(existing) => // compare with previous user etc
case None => navigate(source, Some(current))
}
} else {
// exit recursion, return result
}
}

您可以避免编写所有代码来读取文件等。如果需要计算出现次数,只需在递归中构建一个Map,并根据业务逻辑在每一步增加出现次数。

from queue import LifoQueue, Queue
def averageTime() -> float:
logs = {}
records = Queue()
with open("log.txt") as fp:
lines = fp.readlines()   
for line in lines:
if line[0] not in logs:
logs[line[0]] = LifoQueue()
logs[line[0]].put((line[1], line[2]))
else:
logs[line[0]].put((line[1], line[2]))
for k in logs:
somme = 0
count = 0
while not logs[k].empty():
l = logs[k].get()
somme = (somme + l[0]) if l[1] == "open" else (somme - l[0])
count = count + 1
records.put([k, somme, count//2])
while not records.empty():
record = records.get()
print(f"UserId={record[0]} Avg={record[1]/record[2]}")

相关内容

最新更新