这就是我的输入数据的样子,
20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000
我想将相同的转换为keyValue rdd,其中键是一个整数,而值是一个json对象,目的是将其写入Elasticsearch
(
2024270, {
"metrics": {
"date" : 20170201,
"style_id" : 1234709,
"revenue" : 1000,
"list_count" : 1000,
"pdp_count" : 1000,
"add_to_cart_count" : 1000
}
}
)
在Python中,我可以使用以下代码进行相同的操作,
metrics_rdd = sc.textFile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics)
def format_metrics(line):
tokens = line.split('^')
try:
return (tokens[1], {
'metrics': {
'date': tokens[0],
'mrp': float(tokens[2]),
'revenue': float(tokens[3]),
'quantity': int(tokens[4]),
'product_discount': float(tokens[5]),
'coupon_discount': float(tokens[6]),
'total_discount': float(tokens[7]),
'list_count': int(tokens[8]),
'add_to_cart_count': int(tokens[9]),
'pdp_count': int(tokens[10])
}
}) if len(tokens) > 1 else ('', dict())
,但无法弄清楚如何在Scala中实现相同的成就,并且是Scala的新手,我设法获得了以下输出,但无法将JSON包装到"指标"块中,任何指针都会是真的很有帮助吗?
ordersDF.withColumn("key", $"style_id")
.withColumn("json", to_json(struct($"date", $"style_id", $"mrp")))
.select("key", "json")
.show(false)
// Exiting paste mode, now interpreting.
+-------+-------------------------------------------------+
|key |json |
+-------+-------------------------------------------------+
|2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}|
|2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}|
|2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}|
|1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}|
+-------+-------------------------------------------------+
我尝试了@philantrovert所建议的内容,并且有效。
scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv")
ordersDF: org.apache.spark.sql.DataFrame = [date: int, style_id: int ... 9 more fields]
scala> :paste
// Entering paste mode (ctrl-D to finish)
ordersDF.withColumn("key", $"style_id")
.withColumn("metrics", to_json(struct($"date", $"style_id", $"mrp")))
.select("key", "metrics")
.toJSON
.show(false)
// Exiting paste mode, now interpreting.
+-----------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------+
|{"key":2024270,"metrics":"{"date":20170101,"style_id":2024270,"mrp":1000.0}"}|
|{"key":2024333,"metrics":"{"date":20170101,"style_id":2024333,"mrp":1000.0}"}|
|{"key":2023709,"metrics":"{"date":20170101,"style_id":2023709,"mrp":1000.0}"}|
|{"key":1234709,"metrics":"{"date":20170201,"style_id":1234709,"mrp":1000.0}"}|
+-----------------------------------------------------------------------------------+
我还使用JSON4S库尝试了另一种方式,并且还可以使用,
def convertRowToJSON(row: Row) = {
val json =
("metrics" ->
("date" -> row(1).toString) ~
("style_id" -> row.getInt(1)) ~
("mrp" -> row.getFloat(2)) ~
("revenue" -> row.getFloat(3)) ~
("quantity" -> row.getInt(1)) ~
("product_discount" -> row.getFloat(3)) ~
("coupon_discount" -> row.getFloat(3)) ~
("total_discount" -> row.getFloat(3)) ~
("list_count" -> row.getInt(1)) ~
("add_to_cart_count" -> row.getInt(1)) ~
("pdp_count" -> row.getInt(1))
)
(row.getInt(1),compact(render(json)).toString)
}
scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv").map(convertRowToJSON)
ordersDF: org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
scala> ordersDF.show(false)
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1 |_2 |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2024270|{"metrics":{"date":"2024270","style_id":2024270,"mrp":1000.0,"revenue":1000.0,"quantity":2024270,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024270,"add_to_cart_count":2024270,"pdp_count":2024270}}|
|2024333|{"metrics":{"date":"2024333","style_id":2024333,"mrp":1000.0,"revenue":1000.0,"quantity":2024333,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024333,"add_to_cart_count":2024333,"pdp_count":2024333}}|
|2023709|{"metrics":{"date":"2023709","style_id":2023709,"mrp":1000.0,"revenue":1000.0,"quantity":2023709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2023709,"add_to_cart_count":2023709,"pdp_count":2023709}}|
|1234709|{"metrics":{"date":"1234709","style_id":1234709,"mrp":1000.0,"revenue":1000.0,"quantity":1234709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":1234709,"add_to_cart_count":1234709,"pdp_count":1234709}}|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+