我们将获得实时机器数据作为JSON,我们从RabbitMQ获取了这些数据。以下是JSON的样本,
{"DeviceId":"MAC-1001","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:35","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
{"DeviceId":"MAC-1001","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:36","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
{"DeviceId":"MAC-1002","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:37","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
{"DeviceId":"MAC-1002","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:38","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
数据在" x"分钟的持续时间内窗口,然后是我们想要实现的
-
按DeviceID分组数据,这是完成的,但不确定我们是否可以获取数据集
-
我们想使用ForeChPartition循环遍历上述分组数据并执行每个设备的聚合逻辑,以便在Worker节点内执行代码。
如果我的思考过程错了,请纠正我。
我们的较早代码正在收集数据,循环通过RDD,将它们转换为数据集并使用Spark SQLContext API在数据集上应用集合逻辑。
进行负载测试时,我们看到90%的处理发生在主节点中,过了一会儿,CPU使用率提高到100%,并且该过程被轰炸了。
因此,我们现在正在尝试重新设计整个过程,以最大程度地执行Worker节点中的逻辑。
以下是到目前为止的代码,它实际上在Worker节点中起作用,但是我们尚未获得一个用于汇总逻辑的数据集
public static void main(String[] args) {
try {
mconf = new SparkConf();
mconf.setAppName("OnPrem");
mconf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(mconf);
jssc = new JavaStreamingContext(sc, Durations.seconds(60));
SparkSession spksess = SparkSession.builder().appName("Onprem").getOrCreate();
//spksess.sparkContext().setLogLevel("ERROR");
Map<String, String> rabbitMqConParams = new HashMap<String, String>();
rabbitMqConParams.put("hosts", "localhost");
rabbitMqConParams.put("userName", "guest");
rabbitMqConParams.put("password", "guest");
rabbitMqConParams.put("vHost", "/");
rabbitMqConParams.put("durable", "true");
List<JavaRabbitMQDistributedKey> distributedKeys = new LinkedList<JavaRabbitMQDistributedKey>();
distributedKeys.add(new JavaRabbitMQDistributedKey(QUEUE_NAME, new ExchangeAndRouting(EXCHANGE_NAME, "fanout", ""), rabbitMqConParams));
Function<Delivery, String> messageHandler = new Function<Delivery, String>() {
public String call(Delivery message) {
return new String(message.getBody());
}
};
JavaInputDStream<String> messages = RabbitMQUtils.createJavaDistributedStream(jssc, String.class, distributedKeys, rabbitMqConParams, messageHandler);
JavaDStream<String> machineDataRDD = messages.window(Durations.minutes(2),Durations.seconds(60)); //every 60 seconds one RDD is Created
machineDataRDD.print();
JavaPairDStream<String, String> pairedData = machineDataRDD.mapToPair(s -> new Tuple2<String, String>(getMap(s).get("DeviceId").toString(), s));
JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();
groupedData.foreachRDD(new VoidFunction<JavaPairRDD<String,Iterable<String>>>(){
@Override
public void call(JavaPairRDD<String, Iterable<String>> data) throws Exception {
data.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Iterable<String>>>>(){
@Override
public void call(Iterator<Tuple2<String, Iterable<String>>> data) throws Exception {
while(data.hasNext()){
LOGGER.error("Machine Data == >>"+data.next());
}
}
});
}
});
jssc.start();
jssc.awaitTermination();
}
catch (Exception e)
{
e.printStackTrace();
}
以下分组代码为我们提供了一个设备的字符串,理想情况下,我们想获得一个数据集
JavaPairDStream<String, String> pairedData = machineDataRDD.mapToPair(s -> new Tuple2<String, String>(getMap(s).get("DeviceId").toString(), s));
JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();
对我来说重要的是使用foreachPartition进行循环,以便将代码执行推向工人节点。
浏览了更多代码示例和指南SQLContext之后,SparkSession不会序列化并在Worker节点上可用,因此我们将更改不尝试使用ForeChpartition loop构建数据集的策略。