我在尝试用spark对数据块进行map-reduce时有点卡住了。我想处理日志文件,我想减少到一个(key, dict())元组。
然而,我总是得到一个错误。我不能百分之百确定这是不是正确的做法。我很乐意听取任何建议。因此,我想将一个everything映射到一个(key, dict(values))。
这是我的Mapper和Reducer
from collections import defaultdict
a = {u'1058493694': {'age': {u'25': 1}, 'areacode': {'unknown': 1}, 'catg': {'unknown': 1}, 'city': {'unknown': 1}, 'country': {'unknown': 1}, 'ethnicity': {'unknown': 1}, 'gender': {u'u': 1}, 'geo_data': {'unknown': 1}, 'user_ip': {u'149.6.187.*': 1}}}
b = {u'1058493694': {'age': {u'25': 1}, 'areacode': {'unknown': 1}, 'catg': {'unknown': 1}, 'city': {'London': 1}, 'country': {'unknown': 1}, 'ethnicity': {'unknown': 1}, 'gender': {u'Male': 1}, 'geo_data': {'unknown': 1}, 'user_ip': {u'149.6.187.*': 1}}}
def getValueFromJson(json_obj, field):
try:
raw = dict(json_obj)
if field in raw:
if raw[field]:
return {raw[field]: 1}
except:
return {'unknown': 1}
return {'unknown': 1}
def mapper(line):
attr = dict(defaultdict())
user_id = line.get("user_id", "unknown")
user_attr = ["age", "gender"]
location_attr = ["city", "state", "post_code", "country", "areacode", "user_ip", "geo_data"]
# combine both lists
attr_list = user_attr + location_attr
for item in attr_list:
attr[item] = getValueFromJson(line, item)
return (user_id, attr)
def reducer(a, b):
results = dict()
for key in a.keys():
val = dict()
for k in a[key].keys() + b[key].keys():
val[k] = a[key].get(k, 0) + b[key].get(k, 0)
results[key] = val
return results
我不确定我是否能像现在这样使用减速器。如果有任何关于实现我的目标的最佳实践的帮助,我将不胜感激。
user_pairs = imps_data.map(extractUserData)
user_totals = user_pairs.reduceByKey(joinUserData)
user_totals.take(25)
然后我得到以下错误
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-461-e1b93b972cac> in <module>()
1 user_pairs = imps_data.map(extractUserData)
2 user_totals = user_pairs.reduceByKey(joinUserData)
----> 3 user_totals.take(25)
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in take(self, num)
1263
1264 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1265 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1266
1267 items += res
/home/ubuntu/databricks/spark/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
895 mappedRDD = rdd.mapPartitions(partitionFunc)
896 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
--> 897 allowLocal)
898 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
899
/home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 688.0 failed 4 times, most recent failure: Lost task 6.3 in stage 688.0 (TID 8308, 10.179.246.224): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/ubuntu/databricks/spark/python/pyspark/worker.py", line 111, in main
process()
File "/home/ubuntu/databricks/spark/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 2318, in pipeline_func
return func(split, prev_func(split, iterator))
File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 2318, in pipeline_func
return func(split, prev_func(split, iterator))
File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 304, in func
return f(iterator)
File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 1746, in combineLocally
merger.mergeValues(iterator)
File "/home/ubuntu/databricks/spark/python/pyspark/shuffle.py", line 266, in mergeValues
for k, v in iterator:
File "<ipython-input-456-90f3cdb37d50>", line 5, in extractUserData
File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python2.7/json/decoder.py", line 382, in raw_decode
obj, end = self.scan_once(s, idx)
ValueError: Unterminated string starting at: line 1 column 733 (char 732)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
非常感谢C
谢谢你的建议。就是这样。
我只是在rdd上添加了一个过滤器,过滤掉具有无效json的行
data = sc.textFile(files, 20).filter(lambda line: isJson(line.split("||")[0]))
import json
def isJson(myjson):
try:
json_object = json.loads(myjson)
except ValueError, e:
return False
return True