当尝试将数据集从 Spark 写入 teradata 时,我在数据集中有一些字符串数据时遇到以下错误:
2018-01-02 15:49:05 [pool-2-thread-2] ERROR c.i.i.t.spark2.algo.JDBCTableWriter:115 - Error in JDBC operation:
java.sql.SQLException: [Teradata Database] [TeraJDBC 15.00.00.20] [Error 3706] [SQLState 42000] Syntax error: Data Type "TEXT" does not match a Defined Type name.
at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:308)
at com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:109)
at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:307)
at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:196)
at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:123)
at com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:114)
at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:385)
at com.teradata.jdbc.jdbc_4.TDStatement.doNonPrepExecuteUpdate(TDStatement.java:602)
at com.teradata.jdbc.jdbc_4.TDStatement.executeUpdate(TDStatement.java:1109)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:805)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
如何确保数据正确写入 teradata。
我正在将csv文件从HDFS读取到数据集中,然后尝试使用DataFrameWriter将其写入Teradata。我为此使用以下给定的代码:
ds.write().mode("append")
.jdbc(url, tableName, props);
我正在使用 Spark 2.2.0,而 Teradata 是 15.00.00.07当我尝试写入 Nettezza 时,我遇到了一些类似的问题,而在 DB2 中,我可以写入但字符串值被替换为 .写入这些数据库时是否需要任何选项..?
我能够通过为 Teradata 实现自定义 JDBCDialect 来解决此问题。相同的方法可用于解决其他数据源(如Netezza,DB2,Hive等(的类似问题。
为此,您需要扩展"JdbcDialect"类并注册它:
public class TDDialect extends JdbcDialect {
private static final Map<String, Option<JdbcType>> dataTypeMap = new HashMap<String, Option<JdbcType>>();
static {
dataTypeMap
.put("int", Option.apply(JdbcType.apply("INTEGER",
java.sql.Types.INTEGER)));
dataTypeMap.put("long",
Option.apply(JdbcType.apply("BIGINT", java.sql.Types.BIGINT)));
dataTypeMap.put("double", Option.apply(JdbcType.apply(
"DOUBLE PRECISION", java.sql.Types.DOUBLE)));
dataTypeMap.put("float",
Option.apply(JdbcType.apply("FLOAT", java.sql.Types.FLOAT)));
dataTypeMap.put("short", Option.apply(JdbcType.apply("SMALLINT",
java.sql.Types.SMALLINT)));
dataTypeMap
.put("byte", Option.apply(JdbcType.apply("BYTEINT",
java.sql.Types.TINYINT)));
dataTypeMap.put("binary",
Option.apply(JdbcType.apply("BLOB", java.sql.Types.BLOB)));
dataTypeMap.put("timestamp", Option.apply(JdbcType.apply("TIMESTAMP",
java.sql.Types.TIMESTAMP)));
dataTypeMap.put("date",
Option.apply(JdbcType.apply("DATE", java.sql.Types.DATE)));
dataTypeMap.put("string", Option.apply(JdbcType.apply("VARCHAR(255)",
java.sql.Types.VARCHAR)));
dataTypeMap.put("boolean",
Option.apply(JdbcType.apply("CHAR(1)", java.sql.Types.CHAR)));
dataTypeMap.put("text", Option.apply(JdbcType.apply("VARCHAR(255)",
java.sql.Types.VARCHAR)));
}
/***/
private static final long serialVersionUID = 1L;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:teradata");
}
@Override
public Option<JdbcType> getJDBCType(DataType dt) {
Option<JdbcType> option = dataTypeMap.get(dt.simpleString().toLowerCase());
if(option == null){
option = Option.empty();
}
return option;
}
}
现在,您可以在对 Spark 调用任何操作之前使用以下代码片段注册它:
JdbcDialects.registerDialect(new TDDialect());
对于某些数据源(例如 Hive(,您可能需要重写另一种方法以避免 NumberFormatExceptions 或一些类似的异常:
@Override
public String quoteIdentifier(String colName) {
return colName;
}
希望这将帮助任何面临类似问题的人。
它对我有用,你能尝试一次并让我知道吗?
Point to be noted:
***Your hive table must be in Text format as storage. It should not be ORC.
Create the schema in Teradata before writing it from your pyspark notebook.***
df = spark.sql("select * from dbname.tableName")
properties = {
"driver": "com.teradata.jdbc.TeraDriver",
"user": "xxxx",
"password": "xxxxx"
}
df.write.jdbc(url='provide_url',table='dbName.tableName', properties=properties)