我正在为SQL Server编写一个Spark数据框架,如下面的代码摘录所示。
url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname,jdbcPort,jdbcDatabase)
mydf = spark.sql(f"SELECT * FROM {source['db']}.{source['table']}")
mydf.write
.format("jdbc")
.mode("overwrite")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", url)
.option("dbtable", f"{destination['db']}.dbo.{destination['table']}")
.option("user", properties['user'])
.option("password", properties['password'])
.option("batchSize", 50000)
.save()
但是,字符串字段默认存储为nvarchar,而我想为它们指定一个varchar值。事先对数据框执行强制类型转换以转换列数据类型没有帮助。
如有任何建议,不胜感激。
您应该实现并注册JdbcDialect。
https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/jdbc/JdbcDialect.html重写getJDBCType()方法并提供所需的类型。
public CustomDialect extends JdbcDialect { @Override public boolean canHandle(String url) { return true; } @Override public Option<JdbcType> getJDBCType(DataType dt) { if (dt.equals(DataTypes.StringType)) { return new Some<>(new JdbcType("VARCHAR(1000)", Types.VARCHAR)); } return super.getJDBCType(dt); } @Override public String quoteIdentifier(String colName) { return colName; }
}
注册方言
JdbcDialects。registerDialect(new CustomDialect());