这就是我的代码过去的样子:
val finalEmails : Dataset[Models.Mail] = HtmlBuilder.main(dataEmails, created_date_string)
val newCampaigns : DataFrame = finalEmails.groupBy("usecase")
.agg(count("*").alias("selected"),first("subject").as("subject"),first("html").as("html"))
.withColumn("id_account",lit(id_account_saveCassandra))
newCampaigns.as[Models.AggMails].foreach( aggmails => {
HtmlBuilder.saveToCassandra(aggmails.id_account, aggmails.selected, aggmails.subject, aggmails.usecase, aggmails.html, created_date_string.toLong)
})
HtmlBuilder.scala:
object HtmlBuilder {
def main(emailsToSend: Dataset[Models.EmailToSend], created_date_string: String) : Dataset[Models.Mail] = {
val builtHtmls: Dataset[Models.Mail] = emailsToSend.as[Models.EmailToSend]
.filter(emailtosend => { getFilteredMail(emailtosend) })
.map(emailtosend => { getFormattedMail(emailtosend) })
builtHtmls
}
def saveToCassandra(id_account: String, selected: Long, subject: String, useCase: String, html: String, created_date: Long ) = {
import com.datastax.spark.connector._
val campaign: List[Models.Campaign] = List(new Models.Campaign(id_account, true, created_date, html, selected, "created", subject, useCase))
val rdd: RDD[Models.Campaign] = Spark.session.sparkContext.parallelize(campaign);
Try(rdd.saveToCassandra("kiliba", "campaigns", SomeColumns("id_account","automatic_campaign", "created_date", "html", "selected", "status", "subject", "usecase"))) match {
case Failure(exception) ⇒ println("ERROR CAMPAIGNS : " + exception); System.exit(0)
case Success(value) ⇒ println("CAMPAIGN SAVED")
}
}
// some other functions...
}
object Models {
case class EmailToSend(
id_customer: String,
usecase: String,
customer: Customer,
godson_customer: Customer,
sponsor_customer: Customer,
products: Seq[Models.Product],
cart_link: String
)
case class Mail (
val id_account: String,
val smtp: String,
val to: String,
val sender: String,
val subject: String,
val alias: String,
val html: String,
val text: String,
val usecase: String,
val created_date: String,
val id_customer: String
)
case class AggMails (
val selected : Long,
val subject: String,
val usecase: String,
val id_account: String,
val html: String
)
case class Campaign(
id_account: String,
automatic_campaign: Boolean,
created_date: Long,
html: String,
selected: Long,
status: String,
subject: String,
usecase: String
)
}
它在HtmlBuilder.main
中处理一些原始数据dataEmails
,并将其格式化为电子邮件格式dataEmails
,然后格式化为活动格式newCampaigns
。然后,这些活动与HtmlBuilder.saveToCassandra
一起存储在我们的数据库中。
这是有效的。
但是,我需要将数据电子邮件作为数组而不是数据集来处理,因为HtmlBuilder.main
中的映射和过滤函数只能调用一次:我添加了一些不能多次调用的代码,例如API调用以创建som数据。
我找到了一个简单的解决方案,即在将dataEmails传递给HtmlBuilder.main
时将其转换为数组,然后在从HtmlBuilder.main
:返回时将其转化回数据集
// I simply added a .collect()
val finalEmails : Dataset[Models.Mail] = HtmlBuilder.main(dataEmails.collect(), created_date_string)
HtmlBuilder.scala:
object HtmlBuilder {
// type of emailsToSend is now array
def main(emailsToSend: Array[Models.EmailToSend], created_date_string: String) : Dataset[Models.Mail] = {
var formattedEmailsToSend: Array[Models.EmailToSend] = emailsToSend
// apply a filter and a map on the array, hence only once, on HtmlBuilder.main call, instead of
// every time the dataset is executed
val builtHtmls: Array[Models.Mail] = formattedEmailsToSend
.filter(emailtosend => { getFilteredMail(emailtosend) })
.map(emailtosend => { getFormattedMail(emailtosend) })
// convert the array back to a dataset
builtHtmls.toSeq.toDF().as[Models.Mail]
}
}
在我看来这很好,然而,HtmlBuilder.saveToCassandra
现在永远挂着,我不明白为什么。
我试着查看互联网上的每一个日志、每一个代码、每一主题,但一无所获,甚至日志中都没有错误。我记录了在所有阶段处理的数据,在v1和v2之间看起来总是完全一样。
此外,我注意到似乎存在异步性问题,因为HtmlBuilder.saveToCassandra
中的日志在v2中被混淆得令人费解,而在v1中有1乘1的顺序,这是我在v2中所期望的。我在saveToCassandra中添加了一些日志。
v1火花日志:
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,1,Christine, c’est notre anniversaire ❤️,anniversaryregistration,1642669261240)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,true,1642669261240,html,1,created,Christine, c’est notre anniversaire ❤️,anniversaryregistration))
fff43
40653
fff44
fff5
fff6
CAMPAIGN SAVED
fff7
fff2
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,1,Claire, joyeux anniversaire 🎂,birthday,1642669261240)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,true,1642669261240,html,1,created,Claire, joyeux anniversaire 🎂,birthday))
fff43
41245
fff44
fff5
fff6
CAMPAIGN SAVED
fff7
fff2
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,1,J-4 avant le Black Friday chez nat & nin,blackfriday1,1642669261240)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_08:59:03,true,1642669261240,html,1,created,J-4 avant le Black Friday chez nat & nin,blackfriday1))
fff43
30341
fff44
fff5
fff6
CAMPAIGN SAVED
fff7
fff2
等等,每个活动都是一个接一个地处理的,没有重叠。
v2:
aggmails
fff1
aggmails
fff1
fff3
fff4
fff3
fff4
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,Christine, c’est notre anniversaire ❤️,anniversaryregistration,1642669978118)
fff42
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,Polinard-Rawdon, bienvenue chez nous,welcome,1642669978118)
fff42
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,Aude, ces produits n'attendent que vous !,postvisits,1642669978118)
fff42
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,Polinard-Rawdon, bienvenue chez nous,welcome))
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,Christine, c’est notre anniversaire ❤️,anniversaryregistration))
fff43
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,Aude, ces produits n'attendent que vous !,postvisits))
fff43
44204
fff44
fff5
40569
fff44
fff5
fff43
40319
fff44
aggmails
fff1
fff3
fff4
(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,1,J-2 avant le Black Friday chez nat & nin,blackfriday2,1642669978118)
fff42
fff6
fff6
List(Campaign(fake_5db9a3a917774f0011efadf1_20-01-2022_09:11:01,true,1642669978118,html,1,created,J-2 avant le Black Friday chez nat & nin,blackfriday2))
fff43
30285
fff44
fff5
fff5
fff6
fff6
一切都一团糟。怎么回事?
附言:我试着把我的代码减少到最小,但不幸的是,这是一个太复杂的程序的一部分,我无法制作一个简单的可复制的例子,这是我能做的最好的。
您真的不需要这样做:
newCampaigns.as[Models.AggMails].foreach( aggmails => {
HtmlBuilder.saveToCassandra(aggmails.id_account,
aggmails.selected, aggmails.subject,
aggmails.usecase, aggmails.html, created_date_string.toLong)
})
Spark Cassandra连接器支持将数据帧/数据集直接写入表,而无需使用RDD API,尤其是从foreach
调用内部。只需执行(使用mode("append")
以避免删除数据很重要-overwrite
将截断表(
newCampaigns.as[Models.AggMails].write
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "table_name", "keyspace" -> "ks_name"))
.mode("append")
.save()