我对spark非常陌生,但我确信有一种很好的方法可以比我现在更快地完成我想要的事情。
本质上,我有一个S3 bucket,里面有很多twitter数据的JSON。我想浏览所有这些文件,从JSON中获取文本,对文本进行情绪分析(目前使用Stanford NLP),然后将Tweet+sentiment上传到数据库(现在我使用的是dynamo,但这不是成败攸关的问题)
我目前拥有的代码是
/**
* Per thread:
* 1. Download a file
* 2. Do sentiment on the file -> output Map<String, List<Float>>
* 3. Upload to Dynamo: (a) sentiment (b) number of tweets (c) timestamp
*
*/
List<String> keys = s3Connection.getKeys();
ThreadPoolExecutor threads = new ThreadPoolExecutor(40, 40, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threads.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (String key : keys) {
threads.submit(new Thread(() -> {
try {
S3Object s3Object = s3Connection.getObject(key);
Map<String, List<Float>> listOfTweetsWithSentiment = tweetSentimentService.getTweetsFromJsonFile(s3Object.getObjectContent());
List<AggregatedTweets> aggregatedTweets = tweetSentimentService.createAggregatedTweetsFromMap(listOfTweetsWithSentiment, key);
for (AggregatedTweets aggregatedTweet : aggregatedTweets) {
System.out.println(aggregatedTweet);
tweetDao.putItem(aggregatedTweet);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}));
}
这很好用。通过在特定的日期范围上运行此代码(即getKeys只获取特定日期范围的密钥),并在不同的EC2上旋转该过程的许多实例,每个实例都在不同的日期范围内运行,我能够将该过程加快到仅约2小时。
然而,必须有一种更快的方法来实现这一点,使用一个好的ole map reduce,但我甚至不知道如何开始研究这一点。有可能在我的地图中进行情绪分析,然后根据时间戳进行减少吗?
此外,我曾考虑使用AWS Glue,但我认为在那里使用斯坦福NLP库没有好的方法。
如有任何帮助,我们将不胜感激。
是的,您可以使用Apache Spark来完成。有很多方法可以设计你的应用程序、配置基础设施等。我提出了一个简单的设计:
-
您在AWS上,因此使用Spark创建一个EMR集群。包含Zeppelin进行交互式调试会很有用。
-
Spark使用了几个数据抽象。你的朋友是RDD和数据集(阅读关于它们的文档)。将数据读取到数据集的代码可能相同:
SparkSession ss = SparkSession.builder().getOrCreate(); Dataset<Row> dataset = ss.read("s3a://your_bucket/your_path");
-
现在您有了
Dataset<Row>
。这对于类似SQL的操作非常有用。对于您的分析,您需要将其转换为Spark RDD:JavaRDD<Tweet> analyticRdd = dataset.toJavaRDD().map(row -> { return TweetsFactory.tweetFromRow(row); });
-
因此,有了
analyticRdd
,您就可以对员工进行分析。只是不要忘记让所有使用数据的服务都可序列化。