在将具有字符串列的数据集写入 TB 数据时获取 SQLException



当尝试将数据集从 Spark 写入 teradata 时,我在数据集中有一些字符串数据时遇到以下错误:

2018-01-02 15:49:05 [pool-2-thread-2] ERROR c.i.i.t.spark2.algo.JDBCTableWriter:115 - Error in JDBC operation:
java.sql.SQLException: [Teradata Database] [TeraJDBC 15.00.00.20] [Error 3706] [SQLState 42000] Syntax error: Data Type "TEXT" does not match a Defined Type name.
      at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:308)
    at com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:109)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:307)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:196)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:123)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:114)
    at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:385)
    at com.teradata.jdbc.jdbc_4.TDStatement.doNonPrepExecuteUpdate(TDStatement.java:602)
    at com.teradata.jdbc.jdbc_4.TDStatement.executeUpdate(TDStatement.java:1109)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:805)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:90)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)

如何确保数据正确写入 teradata。

我正在将csv文件从HDFS读取到数据集中,然后尝试使用DataFrameWriter将其写入Teradata。我为此使用以下给定的代码:

ds.write().mode("append")
            .jdbc(url, tableName, props);

我正在使用 Spark 2.2.0,而 Teradata 是 15.00.00.07当我尝试写入 Nettezza 时,我遇到了一些类似的问题,而在 DB2 中,我可以写入但字符串值被替换为 .写入这些数据库时是否需要任何选项..?

我能够通过为 Teradata 实现自定义 JDBCDialect 来解决此问题。相同的方法可用于解决其他数据源(如Netezza,DB2,Hive等(的类似问题。

为此,您需要扩展"JdbcDialect"类并注册它:

public class TDDialect extends JdbcDialect {
private static final Map<String, Option<JdbcType>> dataTypeMap = new HashMap<String, Option<JdbcType>>();
static {
    dataTypeMap
            .put("int", Option.apply(JdbcType.apply("INTEGER",
                    java.sql.Types.INTEGER)));
    dataTypeMap.put("long",
            Option.apply(JdbcType.apply("BIGINT", java.sql.Types.BIGINT)));
    dataTypeMap.put("double", Option.apply(JdbcType.apply(
            "DOUBLE PRECISION", java.sql.Types.DOUBLE)));
    dataTypeMap.put("float",
            Option.apply(JdbcType.apply("FLOAT", java.sql.Types.FLOAT)));
    dataTypeMap.put("short", Option.apply(JdbcType.apply("SMALLINT",
            java.sql.Types.SMALLINT)));
    dataTypeMap
            .put("byte", Option.apply(JdbcType.apply("BYTEINT",
                    java.sql.Types.TINYINT)));
    dataTypeMap.put("binary",
            Option.apply(JdbcType.apply("BLOB", java.sql.Types.BLOB)));
    dataTypeMap.put("timestamp", Option.apply(JdbcType.apply("TIMESTAMP",
            java.sql.Types.TIMESTAMP)));
    dataTypeMap.put("date",
            Option.apply(JdbcType.apply("DATE", java.sql.Types.DATE)));
    dataTypeMap.put("string", Option.apply(JdbcType.apply("VARCHAR(255)",
            java.sql.Types.VARCHAR)));
    dataTypeMap.put("boolean",
            Option.apply(JdbcType.apply("CHAR(1)", java.sql.Types.CHAR)));
    dataTypeMap.put("text", Option.apply(JdbcType.apply("VARCHAR(255)",
            java.sql.Types.VARCHAR)));
}
/***/
private static final long serialVersionUID = 1L;
@Override
public boolean canHandle(String url) {
    return url.startsWith("jdbc:teradata");
}
@Override
public Option<JdbcType> getJDBCType(DataType dt) {
    Option<JdbcType> option = dataTypeMap.get(dt.simpleString().toLowerCase());
    if(option == null){
        option = Option.empty();
    }
    return option;
}
}

现在,您可以在对 Spark 调用任何操作之前使用以下代码片段注册它:

JdbcDialects.registerDialect(new TDDialect());

对于某些数据源(例如 Hive(,您可能需要重写另一种方法以避免 NumberFormatExceptions 或一些类似的异常:

@Override
public String quoteIdentifier(String colName) {
    return colName;
}

希望这将帮助任何面临类似问题的人。

它对我有用,你能尝试一次并让我知道吗?

Point to be noted:
***Your hive table must be in Text format as storage. It should not be ORC.
Create the schema in Teradata before writing it from your pyspark notebook.***


df = spark.sql("select * from dbname.tableName")
properties = {
"driver": "com.teradata.jdbc.TeraDriver",
"user": "xxxx",
"password": "xxxxx"
}
df.write.jdbc(url='provide_url',table='dbName.tableName', properties=properties)

相关内容

  • 没有找到相关文章

最新更新