弹簧和火花流一起出现问题



我正在从事一个数据分析项目。我是Spark的新手。我创建了一个SpringBoot项目,在该项目中,我使用了一些Spark Streaming消费者,他们使用Kafka主题中的数据。

我在同一个SpringBoot应用程序中使用了Kafka、Spark等所有组件。下面是SpringBootMain类,我在其中使用命令行运行程序初始化Spark Streaming作业。

@SpringBootApplication
@EnableCaching
public class SpringApplication
{
public static void main(String[] args)
{
SpringApplication.run(SpringApplication.class, args);
}
@Bean
public EnrichEventSparkConsumerRunner sparkEnrichEventConsumerRunner()
{
return new EnrichEventSparkConsumerRunner();
}

@Bean
public RawEventSparkConsumerRunner sparkRawEventConsumerRunner()
{
return new RawEventSparkConsumerRunner();
}
}

public class EnrichEventSparkConsumerRunner implements CommandLineRunner
{
@Autowired
JavaStreamingContext javaStreamingContext;
@Autowired
EnrichEventSparkConsumer enrichEventSparkConsumer;
@Override
public void run(String... args) throws Exception
{
// start Raw Event Spark Cosnumer.
JobContextImpl jobContext = new JobContextImpl(javaStreamingContext);
// start Enrich Event Spark Consumer.
enrichEventSparkConsumer.startEnrichEventConsumer(jobContext.streamingctx());
}
}

public class RawEventSparkConsumerRunner implements CommandLineRunner
{
@Autowired
JavaStreamingContext javaStreamingContext;
@Autowired
RawEventSparkConsumer rawEventSparkConsumer;
@Override
public void run(String... args) throws Exception
{
// start Raw Event Spark Cosnumer.
JobContextImpl jobContext = new JobContextImpl(javaStreamingContext);
rawEventSparkConsumer.sparkRawEventConsumer(jobContext.streamingctx());
}
}

RawEventSparkConsumer.java-在这个SparkStreaming类中,我们使用Kafka主题中的数据,丰富数据并将数据保存为弹性数据,我们还将丰富的数据发送到EnrichEventSparkConsumer使用的下一个Kafka话题。

@Component
public class RawEventSparkConsumer implements Serializable
{
private final Logger logger = LoggerFactory.getLogger(RawEventSparkConsumer.class);
@Autowired
private ElasticSearchServiceImpl dataModelServiceImpl;
@Autowired
private EnrichEventKafkaProducer enrichEventKafkaProducer;
@Autowired
private SparkConfiguration sparkConfiguration;
@Autowired
private RawAttributesConfig rawAttConfig;
private static  ObjectMapper mapper;
public void sparkRawEventConsumer(JavaStreamingContext streamingContext)
{
mapper = new ObjectMapper();
Collection<String> topics = Arrays.asList(sparkConfiguration.getRawEventTopic());
Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", sparkConfiguration.getBootStrapServers());
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
JavaInputDStream<ConsumerRecord<String, String>> rawEventRDD = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> dStream = rawEventRDD.map((x) -> x.value());
/*
* JavaDStream<EnrichEventDataModel> enrichEventRdd = dStream.map((raw) -> { BaseDataModel csvDataModel =
* mapper.readValue(raw, BaseDataModel.class); return new EnrichEventDataModel(csvDataModel); });
*/
JavaDStream<List<Map<String, Object>>> enrichEventRdd = dStream.map(convertIntoMapList);
enrichEventRdd.foreachRDD(rdd -> {
logger.info("Inside rawEventRDD.foreachRDD = = = " + rdd.count());
sendEnrichEventToKafkaTopic(rdd.collect());
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.info("RawEvent consumer SparkStreaming job started.");
}
HashMap<String, UserIndexDto> userMap = new HashMap();
private void sendEnrichEventToKafkaTopic(List<List<Map<String, Object>>> list)
{
if (enrichEventKafkaProducer != null && list != null && list.size() > 0)
try {
logger.info("sendEnrichEventToKafkaTopic, csv raw log count: "+ list.size());
list.parallelStream().forEach(mapList -> {
logger.info("sendEnrichEventToKafkaTopic -- mapListmapList, mapList: "+ mapList.size());
if (Objects.nonNull(mapList) && mapList.size() > 0) {
List<Map<String, Object>> enrichedMapList = mapList.parallelStream().map(mapData -> {
if (mapData.containsKey(rawAttConfig.getAccountname())) {
String accountName = String.valueOf(mapData.get(rawAttConfig.getAccountname()));
if(accountName != null) {
accountName = accountName.trim();
}
if (accountName != null && accountName.length() > 0 && !userMap.containsKey(accountName)) {
accountName = accountName.split("@")[0];
List<User> userList = dataModelServiceImpl.getUser(accountName,"u_employeeId");
if (userList != null && userList.size() > 0) {
User user = userList.get(0);
if(!user.getU_userId().equalsIgnoreCase(accountName)) {
List<User> userList1 = dataModelServiceImpl.getUser(accountName,"u_email");
if(userList1 != null && userList1.size() > 0) {
User user1 = userList1.get(0);
UserIndexDto userdto1 = new UserIndexDto();
userdto1.setUserId(user1.getU_email());
userdto1.setEmpId(user1.getU_employeeId());
userMap.put(userdto1.getUserId(), userdto1); 
}
}else {
UserIndexDto userdto = new UserIndexDto();
userdto.setUserId(user.getU_userId());
userdto.setEmpId(user.getU_employeeId());
userMap.put(userdto.getUserId(), userdto);
}
writeToLogsFile("Enriching RawEvent using ElasticIndex for accountName="
+ userList.get(0).getU_employeeId(), Constants.INFO);
}else {
List<User> userList1 = dataModelServiceImpl.getUser(accountName,"u_email");
if(userList1 != null && userList1.size() > 0) {
User user1 = userList1.get(0);
UserIndexDto userdto1 = new UserIndexDto();
userdto1.setUserId(user1.getU_email());
userdto1.setEmpId(user1.getU_employeeId());
userMap.put(userdto1.getUserId(), userdto1); 
}
} 
}
UserIndexDto userdto = userMap.get(accountName);
mapData.put("userId", userdto != null ? userdto.getUserId() : null );
mapData.put("empId", userdto != null ? userdto.getEmpId() : null);
// writeToLogsFile("Enriching RawEvent using Map for accountName=" + (userdto != null ? userdto.getUserId() : ""),
//     Constants.INFO);
} else {
mapData.put("userId", null);
mapData.put("empId", null);
}
mapData.put("enrichEventId", UUID.randomUUID().toString());
saveDataToEasticSearch(mapData);
return mapData;
}).collect(Collectors.toList());
//saveDataToElasticSearch(enrichedMapList);
processEnrichEvents(enrichedMapList);
}
});
} catch (Exception e) {
writeToLogsFile(e.getMessage(), Constants.ERROR);
}
}

private void saveDataToEasticSearch(Map<String, Object> mapData) {
if(Objects.nonNull(mapData)) {
dataModelServiceImpl.saveEnrichModel(mapData);
}
}
static Function convertIntoMapList = new Function<String, List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> call(String raw) throws Exception
{
// TODO Auto-generated method stub
return mapper.readValue(raw, new TypeReference<List<HashMap<String, Object>>>(){});
}

};

private void processEnrichEvents(List<Map<String, Object>> enrichedMapList)
{
try {
Thread.sleep(3000);
enrichEventKafkaProducer.sendEnrichEvent(enrichedMapList);
} catch (Exception e) {
e.printStackTrace();
}
}
private void saveDataToElasticSearch(List<Map<String, Object>> enrichedMapList)
{
if (!enrichedMapList.isEmpty())
dataModelServiceImpl.saveAllEnrichModel(enrichedMapList);
}
private void writeToLogsFile(String message, String loglevel)
{
if (loglevel.equalsIgnoreCase(Constants.ERROR)) {
logger.error(message);
} else if (loglevel.equalsIgnoreCase(Constants.INFO)) {
logger.info(message);
}
}

现在,我使用mvn-clean-package命令为它创建了一个jar,并使用java-jar指令运行该jar,该指令使用命令行运行程序初始化Spark作业。即使我们用它做了POC,它也工作得很好。

现在我们想在集群环境中运行Spark。所以我有一些与之相关的查询。我搜索了很多,但没有找到正确的方法。下面是问题和查询-

  1. 通过comamnd line runner使用Spark Streaming是正确的方式吗
  2. 正如我所读到的运行集群环境,我们必须使用spark-submit命令提交spark-streamign。怎样我能在我目前的项目中做吗。问题是,正如我所知,对于Spark提交,我们需要一个主要的方法。所以对于Spark Submit,我需要在其中进行哪些更改
  3. 我们正在使用弹性API将数据保存到弹性中。这样做对吗
  4. 请看一下这行sendEnrichEventToKafkaTopic(rdd.collect(((,我们在rdd上调用.collect。当我们调用.collect时,我读到了这一点,然后这个东西在主节点上运行。所以我们必须避免。收集。那么还有什么其他方法呢

由于缺乏问题上下文,以下是一些指南,可能不是最好的选择:

  1. 实现这一点的方法是拥有一个Spark集群,或者Spark即服务,它能够充当服务器,应用程序可以向该集群提交Spark应用程序(或Spark作业(。您可以将您的应用程序打包为一个单独的胖罐子,并将其上传到集群

  2. Spark应用程序应该尽可能地与Spring上下文解耦。如果你不这样做,一种方法是在Spark应用程序的主方法中创建一个ApplicationContext,该方法具有正确的配置,并且在有调用context.getBean的弹簧依赖关系的Spark位置。与Spark Cluster(Spark应用程序(的交互可以通过Spark-submit进行,也可以使用充当Spark客户端的SparkLauncher

  3. 存储业务逻辑输出的位置是特定于应用程序的。你可以保存到任何适合你的应用程序要求的数据存储

  4. sendEnrichEventToKafkaTopic(rdd.collect())可以替换为rdd.toDF.write.format("kafka")以避免收集

最新更新