我在AWS S3中有一个CSV文件,该文件正在加载到AWS Glue,即用于对S3中的源数据文件应用转换。它提供PySpark脚本环境。数据看起来有点像这样:
"ID","CNTRY_CD","SUB_ID","PRIME_KEY","DATE"
"123","IND","25635525","11243749772","2017-10-17"
"123","IND","25632349","112322abcd","2017-10-17"
"123","IND","25635234","11243kjsd434","2017-10-17"
"123","IND","25639822","1124374343","2017-10-17"
预期结果应该是这样的:
"123","IND","25632349","112322abcd","2017-10-17"
"123","IND","25635234","11243kjsd434","2017-10-17"
我正在处理名为"PRIME_KEY"的整数类型字段,该字段可能包含导致数据格式错误的字母
现在的要求是,我需要使用SQL查询来确定Integer类型的主键列是否包含任何字母数字字符,而不是仅包含数值。到目前为止,我已经尝试了一些正则表达式的变体来实现这一点,比如下面的一个,但没有成功:
SELECT *
FROM table_name
WHERE column_name IS NOT NULL AND
CAST(column_name AS VARCHAR(100)) LIKE '%[0-9a-z0-9]%'
源脚本:
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# s3 output directory
output_dir = "s3://aws-glue-scripts../.."
# Data Catalog: database and table name
db_name = "sampledb"
glue_tbl_name = "sampleTable"
datasource = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = glue_tbl_name)
datasource_df = datasource.toDF()
datasource_df.registerTempTable("sample_tbl")
invalid_primarykey_values_df = spark.sql("SELECT * FROM sample_tbl WHERE CAST(PRIME_KEY AS STRING) RLIKE '([a-z]+[0-9]+)|([0-9]+[a-z]+)'")
invalid_primarykey_values_df.show()
这个脚本的输出如下:
+---+--------+--------+------------+----------+-----------+---------------+
|ID | CNTRY_CD | SUB_ID | PRIME_KEY | DATE |
+---+--------+--------+------------+----------+-----------+---------------+
|123|IND|25635525|[1124349772,空]|2017-10-17|
|123|IND|25632349|[null,112322ab..|2017-10-17|
|123|IND|25635234|[null,11243kjsd.|2017-10-17|
|123|IND|25639822|[112434343,空]|2017-10-17|
+--------+--------+--------------------+----------+-----------+---------------+
我已经突出显示了我正在处理的字段的值。它看起来与源数据有些不同。
如有任何帮助,我们将不胜感激。感谢
您可以使用RLIKE
SELECT *
FROM table_name
WHERE CAST(PRIME_KEY AS STRING) RLIKE '([0-9]+[a-z]+)'
更通用的字母数字筛选器匹配。
WHERE CAST(PRIME_KEY AS STRING) RLIKE '([a-z]+[0-9]+)|([0-9]+[a-z]+)'
编辑:根据评论
必要的进口和udfs
val spark = SparkSession.builder
.config(conf)
.getOrCreate
import org.apache.spark.sql.functions._
val extract_pkey = udf((x: String) => x.replaceAll("null|\]|\[|,", "").trim)
import spark.implicits._
使用UDF 设置用于测试和清洁的样本数据
val df = Seq(
("123", "IND", "25635525", "[11243749772,null]", "2017-10-17"),
("123", "IND", "25632349", "[null,112322abcd]", "2017-10-17"),
("123", "IND", "25635234", "[null,11243kjsd434]", "2017-10-17"),
("123", "IND", "25639822", "[1124374343,null]", "2017-10-17")
).toDF("ID", "CNTRY_CD", "SUB_ID", "PRIME_KEY", "DATE")
.withColumn("PRIME_KEY", extract_pkey($"PRIME_KEY"))
df.registerTempTable("tbl")
spark.sql("SELECT * FROM tbl WHERE PRIME_KEY RLIKE '([a-z]+[0-9]+)|([0-9]+[a-z]+)'")
.show(false)
+---+--------+--------+------------+----------+
|ID |CNTRY_CD|SUB_ID |PRIME_KEY |DATE |
+---+--------+--------+------------+----------+
|123|IND |25632349|112322abcd |2017-10-17|
|123|IND |25635234|11243kjsd434|2017-10-17|
+---+--------+--------+------------+----------+