Debezium通知方法从不执行



我有一个maven项目。我想做一个CDC项目Debezium+PostgreSQL,但这不起作用。我使用的是带有此依赖关系的java1.8+maven。它看起来没有错误,但通知方法从未执行。

我只是创建了maven项目,像下面这样配置连接并运行了程序。插槽复制是由java程序自动创建的。

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>1.9.4.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.9.4.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>1.9.4.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.0-alpha7</version>
<scope>runtime</scope>
</dependency>

我的应用程序代码:

public class App 
{
public static void main( String[] args ) throws IOException, InterruptedException
{
/**DatabaseChangeEventListener d = new DatabaseChangeEventListener();
System.out.println("Comenzando ejecución");
d.startEmbeddedEngine();*/
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
Configuration config = Configuration.create()
.with("name", "customer-postgresql-connector")
.with("connector.class", PostgresConnector.class)
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", "localhost")
.with("database.port", 5432)
.with("database.user", "postgres")
.with("database.password", "admin")
.with("database.dbname", "anotherdb")
.with("database.include.list", "anotherdb")
.with("include.schema.changes", "false")
.with("database.allowPublicKeyRetrieval", "true")
.with("plugin.name", "wal2json")
.with("schemas.enable", false)
.with("database.server.id", "10181")
.with("database.server.name", "localhost-postgresql")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();

try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(config.asProperties())
.notifying(record -> {
System.out.println("Record: "+record);
})
.build()) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
engine.close();
executor.shutdown();
while (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Waiting another 5 seconds for the embedded engine to shut down");
}
}
}
}

Y控制台打印此:

[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
[main] INFO io.debezium.embedded.EmbeddedEngine$EmbeddedConfig - EmbeddedConfig values: 
access.control.allow.methods = 
access.control.allow.origin = 
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
config.providers = []
connector.client.config.override.policy = All
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = [http://:8083]
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 60000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = C:UsersVASSAppDataLocalTempoffsets_4608803621683662537.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic = 
plugin.path = null
response.http.headers.config = 
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
task.shutdown.graceful.timeout.ms = 5000
topic.creation.enable = true
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
[main] WARN org.apache.kafka.connect.runtime.WorkerConfig - The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.
[main] WARN org.apache.kafka.connect.runtime.WorkerConfig - Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
[main] INFO io.debezium.embedded.EmbeddedEngine - Stopping the embedded engine
[pool-1-thread-1] INFO org.apache.kafka.connect.storage.FileOffsetBackingStore - Starting FileOffsetBackingStore with file C:UsersVASSAppDataLocalTempoffsets_4608803621683662537.dat
[pool-1-thread-1] WARN io.debezium.connector.postgresql.PostgresConnectorConfig - Logical decoder 'wal2json' is deprecated and will be removed in future versions
[pool-1-thread-1] WARN io.debezium.connector.postgresql.PostgresConnectorConfig - Configuration property 'truncate.handling.mode' is deprecated and will be removed in future versions. Please use 'skipped.operations' instead.
[pool-1-thread-1] WARN io.debezium.connector.postgresql.PostgresConnectorConfig - Configuration property 'toasted.value.placeholder' is deprecated and will be removed in future versions. Please use 'unavailable.value.placeholder' instead.
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask - Starting PostgresConnectorTask with configuration:
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    connector.class = io.debezium.connector.postgresql.PostgresConnector
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.allowPublicKeyRetrieval = true
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.history.file.filename = C:UsersVASSAppDataLocalTempdbhistory_3035630169534300344.dat
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.user = postgres
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.dbname = anotherdb
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.server.id = 10181
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.server.name = localhost-postgresql
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.flush.timeout.ms = 5000
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    include.schema.changes = false
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.port = 5432
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    plugin.name = wal2json
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.flush.interval.ms = 60000
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    internal.key.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.storage.file.filename = C:UsersVASSAppDataLocalTempoffsets_4608803621683662537.dat
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.hostname = localhost
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.password = ********
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    name = customer-postgresql-connector
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    internal.value.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    value.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    key.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.include.list = anotherdb
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.history = io.debezium.relational.history.FileDatabaseHistory
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    schemas.enable = false
[pool-2-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask - No previous offsets found
[pool-1-thread-1] INFO io.debezium.connector.postgresql.PostgresConnectorTask - user 'postgres' connected to database 'anotherdb' on PostgreSQL 11.13, compiled by Visual C++ build 1914, 64-bit with roles:
role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_write_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'replicauser2' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: true]
role 'replicauser' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
role 'usuarioreplica4' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
role 'usuarioreplica2' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
role 'sysrnt' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_read_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_execute_server_program' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'usuarioreplica' [superuser: false, replication: true, inherit: false, create role: false, create db: false, can log in: true]
role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
[pool-1-thread-1] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/9999D438}, catalogXmin=3619]
[pool-1-thread-1] INFO io.debezium.connector.postgresql.PostgresConnectorTask - No previous offset found
[pool-1-thread-1] INFO io.debezium.connector.postgresql.snapshot.InitialSnapshotter - Taking initial snapshot for new datasource
[pool-1-thread-1] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost-postgresql named = change-event-source-coordinator
[pool-1-thread-1] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Metrics registered
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Context created
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.snapshot.InitialSnapshotter - Taking initial snapshot for new datasource
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - According to the connector configuration data will be snapshotted
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 1 - Preparing
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 2 - Determining captured tables
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Adding table public.animales to the list of capture schema tables
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 3 - Locking captured tables []
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 4 - Determining snapshot offset
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - Creating initial offset context
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - Read xlogStart at 'LSN{0/999A2398}' from transaction '3648'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - Read xlogStart at 'LSN{0/999A2398}' from transaction '3648'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 5 - Reading structure of captured tables
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 6 - Persisting schema history
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 7 - Snapshotting data
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshotting contents of 0 tables while still in transaction
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.source.AbstractSnapshotChangeEventSource - Snapshot - Final stage
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='localhost-postgresql'db='anotherdb', lsn=LSN{0/999A2398}, txId=3648, timestamp=2022-07-20T22:58:38.570Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]]
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] WARN io.debezium.relational.RelationalDatabaseSchema - After applying the include/exclude list filters, no changes will be captured. Please check your configuration!
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Connected metrics set to 'true'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Starting streaming
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Retrieved latest position from stored offset 'LSN{0/999A2398}'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/999A2398}'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/9999D438}, catalogXmin=3619]
[pool-3-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost-postgresql named = keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-localhost-postgresql-keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position
Waiting another 5 seconds for the embedded engine to shut down
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/999A23F8}' received
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/999A23F8}' discovered
[pool-4-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost-postgresql named = keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-localhost-postgresql-keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/999A23F8}' arrived, switching off the filtering
Waiting another 5 seconds for the embedded engine to shut down
Waiting another 5 seconds for the embedded engine to shut down
Waiting another 5 seconds for the embedded engine to shut down
Waiting another 5 seconds for the embedded engine to shut down
...

正如你所看到的,Notify部分的打印方法从未执行过:

.notifying(record -> {
System.out.println("Record: "+record);
})
is not executing. Why?

我正在运行应用程序并在表中插入数据,但它看起来没有任何附加。怎么了?

对我来说,database.include.list配置导致了这种行为。

但最有可能的是,如果你在这里,我也可能是其他任何人。

当我:时,我发现了问题

  • 将debezium的日志级别提高到DEBUG
  • 调试源代码-在我的示例中,PostgresStreamingChangeEventSource是由于调试日志级别而产生的输出

最新更新