Word2Vec on Spark Scala



我正在尝试使用mllib中的Word2Vec,以便随后应用kmeans。我正在使用 scala 2.10.5 和 Spark 1.6.3。这是我的代码(标记化后(:

val word2Vec = new Word2Vec()
  .setMinCount(2)
  .setInputCol("FilteredFeauturesEntities")
  .setOutputCol("Word2VecFeatures")
  .setVectorSize(1000)
val model = word2Vec.fit(CleanedTokenizedDataFrame)
val word2VecDataFrame = model.transform(CleanedTokenizedDataFrame)
word2VecDataFrame.show()

我没有收到特殊错误,但我的工作没有到达终点线。这是日志输出:

18/02/05 15:39:32 INFO TaskSetManager: Finished task 4.0 in stage 4.0 (TID 23) in 3143 ms on dhadlx122.haas.xxxxxx (2/9)
18/02/05 15:39:32 INFO TaskSetManager: Starting task 5.1 in stage 4.0 (TID 28, dhadlx121.haas.xxxxxx, partition 5,NODE_LOCAL, 2329 bytes)
18/02/05 15:39:32 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 20) in 3217 ms on dhadlx121.haas.xxxxxx (3/9)
18/02/05 15:39:32 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 22) in 3309 ms on dhadlx123.haas.xxxxxx (4/9)
18/02/05 15:39:32 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 21) in 3677 ms on dhadlx121.haas.xxxxxx (5/9)
18/02/05 15:39:33 INFO TaskSetManager: Finished task 6.0 in stage 4.0 (TID 25) in 3901 ms on dhadlx126.haas.xxxxxx (6/9)
18/02/05 15:39:33 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (dhadlx127.haas.xxxxxx:48384) with ID 6
18/02/05 15:39:33 INFO BlockManagerMasterEndpoint: Registering block manager dhadlx127.haas.xxxxxx:37909 with 5.3 GB RAM, BlockManagerId(6, dhadlx127.haas.xxxxxx, 37909)
18/02/05 15:39:33 INFO TaskSetManager: Lost task 5.1 in stage 4.0 (TID 28) on executor dhadlx121.haas.xxxxxx: java.lang.NullPointerException (null) [duplicate 1]
18/02/05 15:39:33 INFO TaskSetManager: Starting task 5.2 in stage 4.0 (TID 29, dhadlx128.haas.xxxxxx, partition 5,RACK_LOCAL, 2329 bytes)
18/02/05 15:39:33 INFO TaskSetManager: Finished task 7.0 in stage 4.0 (TID 27) in 2948 ms on dhadlx125.haas.xxxxxx (7/9)
18/02/05 15:39:34 INFO TaskSetManager: Lost task 5.2 in stage 4.0 (TID 29) on executor dhadlx128.haas.xxxxxx: java.lang.NullPointerException (null) [duplicate 2]
18/02/05 15:39:34 INFO TaskSetManager: Starting task 5.3 in stage 4.0 (TID 30, dhadlx127.haas.xxxxxx, partition 5,RACK_LOCAL, 2329 bytes)
18/02/05 15:39:35 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on dhadlx127.haas.xxxxxx:37909 (size: 26.4 KB, free: 5.3 GB)
18/02/05 15:39:35 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 19) in 6321 ms on dhadlx120.haas.xxxxxx (8/9)
18/02/05 15:39:36 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on dhadlx127.haas.xxxxxx:37909 (size: 58.9 KB, free: 5.3 GB)
18/02/05 15:39:40 INFO TaskSetManager: Lost task 5.3 in stage 4.0 (TID 30) on executor dhadlx127.haas.xxxxxx: java.lang.NullPointerException (null) [duplicate 3]
18/02/05 15:39:40 ERROR TaskSetManager: Task 5 in stage 4.0 failed 4 times; aborting job
18/02/05 15:39:40 INFO YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
18/02/05 15:39:40 INFO YarnScheduler: Cancelling stage 4
18/02/05 15:39:40 INFO DAGScheduler: ShuffleMapStage 4 (map at Word2Vec.scala:161) failed in 11.037 s
18/02/05 15:39:40 INFO DAGScheduler: Job 3 failed: collect at Word2Vec.scala:170, took 11.058049 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 4.0 failed 4 times, most recent failure: Lost task 5.3 in stage 4.0 (TID 30, dhadlx127.haas.xxxxxx): java.lang.NullPointerException
    at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
    at java.util.regex.Matcher.reset(Matcher.java:309)
    at java.util.regex.Matcher.<init>(Matcher.java:229)
    at java.util.regex.Pattern.matcher(Pattern.java:1093)
    at scala.util.matching.Regex.replaceAllIn(Regex.scala:385)
    at SemanticAnalysis.App$$anonfun$extractPattern$1$1.apply(App.scala:63)
    at SemanticAnalysis.App$$anonfun$extractPattern$1$1.apply(App.scala:63)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
    at org.apache.spark.mllib.feature.Word2Vec.learnVocab(Word2Vec.scala:170)
    at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:284)
    at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:149)
    at SemanticAnalysis.App$.main(App.scala:126)
    at SemanticAnalysis.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:750)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
    at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
    at java.util.regex.Matcher.reset(Matcher.java:309)
    at java.util.regex.Matcher.<init>(Matcher.java:229)
    at java.util.regex.Pattern.matcher(Pattern.java:1093)
    at scala.util.matching.Regex.replaceAllIn(Regex.scala:385)
    at SemanticAnalysis.App$$anonfun$extractPattern$1$1.apply(App.scala:63)
    at SemanticAnalysis.App$$anonfun$extractPattern$1$1.apply(App.scala:63)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
18/02/05 15:39:40 INFO SparkContext: Invoking stop() from shutdown hook
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/sql,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
18/02/05 15:39:40 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
18/02/05 15:39:40 INFO SparkUI: Stopped Spark web UI at http://xxx.xx.xx.xxx:xxxx
18/02/05 15:39:40 INFO YarnClientSchedulerBackend: Interrupting monitor thread
18/02/05 15:39:40 INFO YarnClientSchedulerBackend: Shutting down all executors
18/02/05 15:39:40 INFO YarnClientSchedulerBackend: Asking each executor to shut down
18/02/05 15:39:40 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
18/02/05 15:39:40 INFO YarnClientSchedulerBackend: Stopped
18/02/05 15:39:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/02/05 15:39:40 INFO MemoryStore: MemoryStore cleared
18/02/05 15:39:40 INFO BlockManager: BlockManager stopped
18/02/05 15:39:40 INFO BlockManagerMaster: BlockManagerMaster stopped
18/02/05 15:39:40 INFO SparkContext: Successfully stopped SparkContext
18/02/05 15:39:40 INFO ShutdownHookManager: Shutdown hook called
18/02/05 15:39:40 INFO ShutdownHookManager: Deleting directory /tmp/spark-e769e7c5-4336-45bd-97cd-e0731803f45f
18/02/05 15:39:40 INFO ShutdownHookManager: Deleting directory /tmp/spark-f427cf4c-4236-4e57-a304-6be2a52932f3
18/02/05 15:39:40 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/02/05 15:39:40 INFO ShutdownHookManager: Deleting directory /tmp/spark-f427cf4c-4236-4e57-a304-6be2a52932f3/httpd-0ab9e5ee-930e-4a48-be77-f5a6d2b01250
18/02/05 15:39:40 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
18/02/05 15:39:40 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
18/02/05 15:39:40 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

此外,相同的代码适用于相同的工作环境中的小示例:

package BIGDATA
/**
* @author ${user.name}
*/
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.mllib.linalg.{VectorUDT, Vectors}

object App {
  def main(args : Array[String]) {
val conf = new SparkConf()
  .setAppName("SEMANTIC ANALYSIS - TEST")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
println("====================================================")
println("READING DATA")
println("====================================================")

val pattern: scala.util.matching.Regex = "(([\w\.-]+@[\w\.-]+)|((X|A|x|a)\d{6})|(MA\d{7}\w|MA\d{7}|FR\d{8}\w)|(w+\..*(\.com|fr))|([|\[\]!\(\)?,;:@&*#_=\/]*))".r
def extractPattern(pattern: scala.util.matching.Regex) = udf(
  (title: String) => pattern.replaceAllIn(title, "")
)
val df = Seq(
  (8, "Hi I heard about Spark x163021. Now, let’s use trained model by loading it. We need to import KMeansModel in order to use it for loading the model from file."),
  (64, "I wish Java could use case classes. Above is a very naive example in which we use training dataset as input data too. In real world we will train a model, save it and later use it for predicting clusters of input data."),
  (-27, "Logistic regression models are neat. Here is how you can save a trained model and later load it for prediction.")
).toDF("number", "word").select($"number", $"word",
  extractPattern(pattern)($"word").alias("NewWord"))

println("====================================================")
println("FEATURE TRANSFORMERS")
println("====================================================")
val tokenizer = new Tokenizer()
  .setInputCol("NewWord")
  .setOutputCol("FeauturesEntities")
val TokenizedDataFrame = tokenizer.transform(df)
val remover = new StopWordsRemover()
  .setInputCol("FeauturesEntities")
  .setOutputCol("FilteredFeauturesEntities")
val CleanedTokenizedDataFrame = remover.transform(TokenizedDataFrame)
CleanedTokenizedDataFrame.show()

println("====================================================")
println("WORD2VEC : LEARN A MAPPING FROM WORDS TO VECTORS")
println("====================================================")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setMinCount(2)
  .setInputCol("FilteredFeauturesEntities")
  .setOutputCol("Word2VecFeatures")
  .setVectorSize(1000)
val model = word2Vec.fit(CleanedTokenizedDataFrame)
val word2VecDataFrame = model.transform(CleanedTokenizedDataFrame)
word2VecDataFrame.show()
  }
}

第一个例子有什么问题?谢谢!

你的代码永远不会达到Word2Vec。它在调用udf失败word因为列包含nulls 。例如

val df = Seq((1, null), (2, "foo bar")).toDF("id", "word")
df.select(extractPattern(pattern)($"word").alias("NewWord")).show

将以同样的方式失败:

java.lang.NullPointerException
    at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
    at java.util.regex.Matcher.reset(Matcher.java:309)
    at java.util.regex.Matcher.<init>(Matcher.java:229)
    at java.util.regex.Pattern.matcher(Pattern.java:1093)

在继续之前,请使用na.drop清理数据,并且通常使用regexp_replace,而不是udf

最新更新