为什么Kafka jdbc connect将数据插入为BLOB而不是varchar



我正在使用Java制作器在我的Kafka主题顶部插入数据。然后我使用 Kafka jdbc connect 将数据插入我的 Oracle 表中。下面是我的生产者代码。

package producer.serialized.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class Sender4 {
    public static void main(String[] args) {
        String flightSchema = "{"type":"record"," + ""name":"Flight","
                + ""fields":[{"name":"flight_id","type":"string"},{"name":"flight_to","type":"string"},{"name":"flight_from","type":"string"}]}";                
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);    
        props.put("schema.registry.url", "http://192.168.0.1:8081");            
        KafkaProducer producer = new KafkaProducer(props);    
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(flightSchema);            
        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("flight_id", "myflight");
        avroRecord.put("flight_to", "QWE");
        avroRecord.put("flight_from", "RTY");    
        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic9",avroRecord);
        producer.send(record);
    }
}

下面是我的卡夫卡连接属性

name=test-sink-6
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=topic9
connection.url=jdbc:oracle:thin:@192.168.0.1:1521:usera
connection.user=usera
connection.password=usera
auto.create=true
table.name.format=FLIGHTS4
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.0.1:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.0.1:8081

从我的模式来看,我希望插入到我的 Oracle 表的值是 varchar2。我创建了一个包含 3 个 varchar2 列的表。当我启动连接器时,没有插入任何内容。然后我删除了表并在打开表自动创建模式的情况下运行连接器。那一次,创建了表并插入了值。但问题是,列数据类型是 CLOB。我希望它是varchar2,因为它使用更少的数据。

为什么会发生这种情况,我该如何解决这个问题?谢谢。

看起来Kafka的String映射到Oracle的NCLOB

<table border="1">
<tr>
<th>Schema Type</th><th>MySQL</th><th>Oracle</th><th>PostgreSQL</th><th>SQLite</th>
</tr>
<tr>
<td>INT8</td><td>TINYINT</td><td>NUMBER(3,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT16</td><td>SMALLINT</td><td>NUMBER(5,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT32</td><td>INT</td><td>NUMBER(10,0)</td><td>INT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT64</td><td>BIGINT</td><td>NUMBER(19,0)</td><td>BIGINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>FLOAT32</td><td>FLOAT</td><td>BINARY_FLOAT</td><td>REAL</td><td>REAL</td>
</tr>
<tr>
<td>FLOAT64</td><td>DOUBLE</td><td>BINARY_DOUBLE</td><td>DOUBLE PRECISION</td><td>REAL</td>
</tr>
<tr>
<td>BOOLEAN</td><td>TINYINT</td><td>NUMBER(1,0)</td><td>BOOLEAN</td><td>NUMERIC</td>
</tr>
<tr>
<td>STRING</td><td>VARCHAR(256)</td><td>NCLOB</td><td>TEXT</td><td>TEXT</td>
</tr>
<tr>
<td>BYTES</td><td>VARBINARY(1024)</td><td>BLOB</td><td>BYTEA</td><td>BLOB</td>
</tr>
<tr>
<td>'Decimal'</td><td>DECIMAL(65,s)</td><td>NUMBER(*,s)</td><td>DECIMAL</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Date'</td><td>DATE</td><td>DATE</td><td>DATE</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Time'</td><td>TIME(3)</td><td>DATE</td><td>TIME</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Timestamp'</td><td>TIMESTAMP(3)</td><td>TIMESTAMP</td><td>TIMESTAMP</td><td>NUMERIC</td>
</tr>
</table>

来源: https://www.ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/com.ibm.swg.im.infosphere.biginsights.admin.doc/doc/admin_kafka_jdbc_sink.html

https://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html

更新

OracleDialect类 (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/dialect/OracleDialect.java( 具有硬编码的CLOB值,只需使用您自己的类扩展它并更改该映射将无济于事,因为方言类型是在 JdbcSinkTask (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java( 的静态方法中定义的

final DbDialect dbDialect = DbDialect.fromConnectionString(config.connectionUrl);

相关内容

  • 没有找到相关文章

最新更新