将一种类型的spark scala数据集转换为另一种类型



我有一个数据集,具有以下大小写类类型:

case class AddressRawData(
addressId: String,
customerId: String,
address: String
)

我想把它转换成:

case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int], //i.e. it is optional
road: Option[String],
city: Option[String],
country: Option[String]
)

使用解析函数:

def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
}
)
}
我是scala和spark的新手。有人能告诉我怎么做吗?

你选对了!当然,有多种方法可以做到这一点。但是,由于您已经在创建一些case类的路上,并且您已经开始创建解析函数,一个优雅的解决方案是使用Dataset的map函数。从文档中,这个map函数签名如下:

def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 

其中T是起始类型(在您的情况下为AddressRawData),U是您想要达到的类型(在您的情况下为AddressData)。所以这个map函数的输入是一个将AddressRawData转换成AddressData的函数。这完全可以是你开始制作的addressParser!

现在,您当前的addressParser具有以下签名:

def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData]

为了能够将它提供给map函数,我们需要做这个签名:

def newAddressParser(unparsedAddress: AddressRawData): AddressData

知道了这一切,我们可以进一步工作!一个示例如下:

import spark.implicits._
import scala.util.Try
// Your case classes
case class AddressRawData(addressId: String, customerId: String, address: String)
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
// Your addressParser function, adapted to be able to feed into the Dataset.map
// function
def addressParser(rawAddress: AddressRawData): AddressData = {
val addressArray = rawAddress.address.split(", ")
AddressData(
rawAddress.addressId,
rawAddress.customerId,
rawAddress.address,
Try(addressArray(0).toInt).toOption,
Try(addressArray(1)).toOption,
Try(addressArray(2)).toOption,
Try(addressArray(3)).toOption
)
}
// Creating a sample dataset
val rawDS = Seq(
AddressRawData("1", "1", "20, my super road, beautifulCity, someCountry"),
AddressRawData("1", "1", "badFormat, some road, cityButNoCountry")
).toDS
val parsedDS = rawDS.map(addressParser)
parsedDS.show                                                                                                                                                                                                                                                            
+---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   
|addressId|customerId|             address|number|         road|            city|    country|                                                                                                                                                                                   
+---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   
|        1|         1|20, my super road...|    20|my super road|   beautifulCity|someCountry|                                                                                                                                                                                   
|        1|         1|badFormat, some r...|  null|    some road|cityButNoCountry|       null|                                                                                                                                                                                   
+---------+----------+--------------------+------+-------------+----------------+-----------+

如您所见,由于您已经预见到解析可能会出错,因此很容易使用scala.util.Try来尝试获取原始地址的片段并在那里添加一些健壮性(第二行包含一些null值,它无法解析address字符串)。

希望这对你有帮助!

相关内容

  • 没有找到相关文章