Apache Spark-工作节点中的数据分组和执行



我们将获得实时机器数据作为JSON,我们从RabbitMQ获取了这些数据。以下是JSON的样本,

{"DeviceId":"MAC-1001","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:35","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
{"DeviceId":"MAC-1001","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:36","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
{"DeviceId":"MAC-1002","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:37","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
{"DeviceId":"MAC-1002","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:38","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}

数据在" x"分钟的持续时间内窗口,然后是我们想要实现的

  1. 按DeviceID分组数据,这是完成的,但不确定我们是否可以获取数据集

  2. 我们想使用ForeChPartition循环遍历上述分组数据并执行每个设备的聚合逻辑,以便在Worker节点内执行代码。

如果我的思考过程错了,请纠正我。

我们的较早代码正在收集数据,循环通过RDD,将它们转换为数据集并使用Spark SQLContext API在数据集上应用集合逻辑。

进行负载测试时,我们看到90%的处理发生在主节点中,过了一会儿,CPU使用率提高到100%,并且该过程被轰炸了。

因此,我们现在正在尝试重新设计整个过程,以最大程度地执行Worker节点中的逻辑。

以下是到目前为止的代码,它实际上在Worker节点中起作用,但是我们尚未获得一个用于汇总逻辑的数据集

public static void main(String[] args) {
		
		try {
			
			mconf = new SparkConf();
			mconf.setAppName("OnPrem");
			mconf.setMaster("local[*]");
			
			JavaSparkContext sc = new JavaSparkContext(mconf);
			  
			jssc = new JavaStreamingContext(sc, Durations.seconds(60));
			SparkSession spksess = SparkSession.builder().appName("Onprem").getOrCreate();
			//spksess.sparkContext().setLogLevel("ERROR");
			
			Map<String, String> rabbitMqConParams = new HashMap<String, String>();
			rabbitMqConParams.put("hosts", "localhost");
			rabbitMqConParams.put("userName", "guest");
			rabbitMqConParams.put("password", "guest");
			rabbitMqConParams.put("vHost", "/");
			rabbitMqConParams.put("durable", "true");
			
			List<JavaRabbitMQDistributedKey> distributedKeys = new LinkedList<JavaRabbitMQDistributedKey>();
			distributedKeys.add(new JavaRabbitMQDistributedKey(QUEUE_NAME, new ExchangeAndRouting(EXCHANGE_NAME, "fanout", ""), rabbitMqConParams));
			
			Function<Delivery, String> messageHandler = new Function<Delivery, String>() {
				public String call(Delivery message) {
					return new String(message.getBody());
				}
			};
			
			JavaInputDStream<String> messages = RabbitMQUtils.createJavaDistributedStream(jssc, String.class, distributedKeys, rabbitMqConParams, messageHandler);
			
			JavaDStream<String> machineDataRDD = messages.window(Durations.minutes(2),Durations.seconds(60)); //every 60 seconds one RDD is Created
			machineDataRDD.print();
			
			JavaPairDStream<String, String> pairedData = machineDataRDD.mapToPair(s -> new Tuple2<String, String>(getMap(s).get("DeviceId").toString(), s)); 
			
			JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();	
			
			groupedData.foreachRDD(new VoidFunction<JavaPairRDD<String,Iterable<String>>>(){
				@Override
				public void call(JavaPairRDD<String, Iterable<String>> data) throws Exception {
					
					data.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Iterable<String>>>>(){
						@Override
						public void call(Iterator<Tuple2<String, Iterable<String>>> data) throws Exception {
						 
							 while(data.hasNext()){
								 LOGGER.error("Machine Data == >>"+data.next());
							 }
						}
						
					});
					 
				}
			
			});
			jssc.start();
			jssc.awaitTermination();
			
		}
		catch (Exception e) 
		{
			e.printStackTrace();
		}

以下分组代码为我们提供了一个设备的字符串,理想情况下,我们想获得一个数据集

JavaPairDStream<String, String> pairedData = machineDataRDD.mapToPair(s -> new Tuple2<String, String>(getMap(s).get("DeviceId").toString(), s));
JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();

对我来说重要的是使用foreachPartition进行循环,以便将代码执行推向工人节点。

浏览了更多代码示例和指南SQLContext之后,SparkSession不会序列化并在Worker节点上可用,因此我们将更改不尝试使用ForeChpartition loop构建数据集的策略。

相关内容

  • 没有找到相关文章

最新更新