我正在尝试在我的DataBricks环境中创建一个STREAMING LIVE TABLE对象,使用S3 bucket与一堆CSV文件作为源。
我使用的语法是:
CREATE OR REFRESH STREAMING LIVE TABLE t1
COMMENT "test table"
TBLPROPERTIES
(
"myCompanyPipeline.quality" = "bronze"
, 'delta.columnMapping.mode' = 'name'
, 'delta.minReaderVersion' = '2'
, 'delta.minWriterVersion' = '5'
)
AS
SELECT * FROM cloud_files
(
"/input/t1/"
,"csv"
,map
(
"cloudFiles.inferColumnTypes", "true"
, "delimiter", ","
, "header", "true"
)
)
示例源文件内容:
ROW_TS,ROW_KEY,CLASS_ID,EVENT_ID,CREATED_BY,CREATED_ON,UPDATED_BY,UPDATED_ON
31/07/2018 02:29,4c1a985c-0f98-46a6-9703-dd5873febbbb,HFK,XP017,test-user,02/01/2017 23:03,,
17/01/2021 21:40,3be8187e-90de-4d6b-ac32-1001c184d363,HTE,XP083,test-user,02/09/2017 12:01,,
08/11/2019 17:21,05fa881e-6c8d-4242-9db4-9ba486c96fa0,JG8,XP083,test-user,18/05/2018 22:40,,
当我运行相关管道时,我得到以下错误:
org.apache.spark.sql。AnalysisException:不能在Hive metastore中创建列名包含逗号的表
由于某些原因,加载器不能识别逗号作为列分隔符,并试图将整个内容加载到单个列中。
我已经花了好几个小时试图找到一个解决方案。用分号替换逗号(在源文件中以及在分隔符"中)选项)没有帮助。尝试手动上传相同的文件到常规(即非流)Databricks表工作得很好。这个问题只与流表有关。
想法?
不是我所期望的解决方案的类型,但它似乎有效,所以…
比起使用SQL来创建DLT,使用Python脚本更有帮助:
import dlt
@dlt.table
def t1():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/input/t1/")
)
请注意,上面的脚本需要通过DLT管道执行(直接从笔记本运行它会抛出ModuleNotFoundError
异常)