我有一个dataframe:
+-----+--------+---------+
| usn|log_type|item_code|
+-----+--------+---------+
| 0| 11| I0938|
| 916| 19| I0009|
| 916| 51| I1097|
| 916| 19| C0723|
| 916| 19| I0010|
| 916| 19| I0010|
|12331| 19| C0117|
|12331| 19| C0117|
|12331| 19| I0009|
|12331| 19| I0009|
|12331| 19| I0010|
|12838| 19| I1067|
|12838| 19| I1067|
|12838| 19| C1083|
|12838| 11| B0250|
|12838| 19| C1346|
+-----+--------+---------+
我想要独特的 item_code
并为每个 item_code
做一个索引:
+---------+------+
|item_code| numId|
+---------+------+
| I0938| 0 |
| I0009| 1 |
| I1097| 2 |
| C0723| 3 |
| I0010| 4 |
| C0117| 5 |
| I1067| 6 |
| C1083| 7 |
| B0250| 8 |
| C1346| 9 |
+---------+------+
我不使用 monotonically_increasing_id
,因为它返回一个bigint。
使用monotanicallly_increasing_id
仅保证数字增加,不保证启动号和连续编号。如果您想确保获得0,1,2,3,...您可以使用RDD功能zipWithIndex()
。
由于我与Python不太熟悉Spark,因此以下示例是使用Scala,但应该很容易转换。
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val df = Seq("I0938","I0009","I1097","C0723","I0010","I0010",
"C0117","C0117","I0009","I0009","I0010","I1067",
"I1067","C1083","B0250","C1346")
.toDF("item_code")
val df2 = df.distinct.rdd
.map{case Row(item: String) => item}
.zipWithIndex()
.toDF("item_code", "numId")
这将为您提供所请求的结果:
+---------+-----+
|item_code|numId|
+---------+-----+
| I0010| 0|
| I1067| 1|
| C0117| 2|
| I0009| 3|
| I1097| 4|
| C1083| 5|
| I0938| 6|
| C0723| 7|
| B0250| 8|
| C1346| 9|
+---------+-----+