如何在向SQL Server写入Spark Dataframe时指定列数据类型



我正在为SQL Server编写一个Spark数据框架,如下面的代码摘录所示。

url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname,jdbcPort,jdbcDatabase)
mydf = spark.sql(f"SELECT * FROM {source['db']}.{source['table']}")
mydf.write
.format("jdbc")
.mode("overwrite")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", url)
.option("dbtable", f"{destination['db']}.dbo.{destination['table']}")
.option("user", properties['user'])
.option("password", properties['password'])
.option("batchSize", 50000)
.save()

但是,字符串字段默认存储为nvarchar,而我想为它们指定一个varchar值。事先对数据框执行强制类型转换以转换列数据类型没有帮助。

如有任何建议,不胜感激。

您应该实现并注册JdbcDialect。

https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/jdbc/JdbcDialect.html

  1. 重写getJDBCType()方法并提供所需的类型。

    public CustomDialect extends JdbcDialect {
    @Override
    public boolean canHandle(String url) {
    return true;
    }
    @Override
    public Option<JdbcType> getJDBCType(DataType dt) {
    if (dt.equals(DataTypes.StringType)) {
    return new Some<>(new JdbcType("VARCHAR(1000)", Types.VARCHAR));
    }
    return super.getJDBCType(dt);
    }
    @Override
    public String quoteIdentifier(String colName) {
    return colName;
    } 
    

    }

  2. 注册方言

JdbcDialects。registerDialect(new CustomDialect());

相关内容

  • 没有找到相关文章

最新更新