我正在尝试将Kafka主题数据写入本地Dynamodb。然而,连接器状态总是处于降级的状态。下面是我的连接器配置属性。
{
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "dynamo-sink-connector",
"connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": [
"KAFKA_STOCK"
],
"aws.dynamodb.pk.hash": "value.companySymbol",
"aws.dynamodb.pk.sort": "value.txTime",
"aws.dynamodb.endpoint": "http://localhost:8000",
"confluent.topic.bootstrap.servers": [
"broker:29092"
]
}
我指的是这个https://github.com/RWaltersMA/mongo-source-sink并将mongo替换为DynamoDB sink
有人能提供一个简单的工作例子,请?
下面是使用AWS DynamoDB Sink连接器的完整示例。
谢谢!向@OneCricketeer征求建议
Dockerfile-DynamoDBConnect,在docker-compose中引用。yml低于
FROM confluentinc/cp-kafka-connect:latest
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-aws-dynamodb:latest
docker-compose.yml
version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- localnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "19092:19092"
- "9092:9092"
networks:
- localnet
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
networks:
- localnet
environment:
SCHEMA_REGISTRY_HOST_NAME: localhost
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
dynamodb-local:
command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data"
image: "amazon/dynamodb-local:latest"
container_name: dynamodb-local
ports:
- "8000:8000"
networks:
- localnet
volumes:
- "./docker/dynamodb:/home/dynamodblocal/data"
working_dir: /home/dynamodblocal
connect:
image: confluentinc/cp-kafka-connect-base:latest
build:
context: .
dockerfile: Dockerfile-DynamoDBConnect
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
networks:
- localnet
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:19092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.3.0 Connect image
CLASSPATH: "/usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.0.jar"
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo "WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)"; fi ; /etc/confluent/docker/run'"
volumes:
- ../build/confluent/kafka-connect-aws-dynamodb:/usr/sahre/confluent-hub-components/confluentinc-kafka-connect-aws-dynamodb
- $HOME/.aws/credentialstest:/home/appuser/.aws/credentials
- $HOME/.aws/configtest:/home/appuser/.aws/config
rest-proxy:
image: confluentinc/cp-kafka-rest:5.3.0
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8082:8082"
hostname: rest-proxy
container_name: rest-proxy
networks:
- localnet
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:19092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
control-center:
image: confluentinc/cp-enterprise-control-center:6.0.0
hostname: control-center
container_name: control-center
networks:
- localnet
depends_on:
- broker
- schema-registry
- connect
- dynamodb-local
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: PLAINTEXT://broker:19092
CONTROL_CENTER_KAFKA_CodeCamp_BOOTSTRAP_SERVERS: PLAINTEXT://broker:19092
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_METRICS_TOPIC_PARTITIONS: 1
# Amount of heap to use for internal caches. Increase for better throughput
CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 100000000
CONTROL_CENTER_STREAMS_CONSUMER_REQUEST_TIMEOUT_MS: "960032"
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
# HTTP and HTTPS to Control Center UI
CONTROL_CENTER_REST_LISTENERS: http://0.0.0.0:9021
PORT: 9021
# Connect
CONTROL_CENTER_CONNECT_CONNECT1_CLUSTER: http://connect:8083
# Schema Registry
CONTROL_CENTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
networks:
localnet:
attachable: true
以Avro格式发布消息的Java示例
public class AvroProducer {
public static void main(String[] args) {
// Variables (bootstrap server, topic name, logger)
final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final String bootstrapServers = "127.0.0.1:9092";
final String topicName = "stockdata";
// Properties declaration (bootstrap server, key serializer, value serializer
// Note use of the ProducerConfig object
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
// Create Producer object
// Note the generics
KafkaProducer<String, Stock> producer = new KafkaProducer<>(properties);
Stock stock = Stock.newBuilder()
.setStockCode("APPL")
.setStockName("Apple")
.setStockPrice(150.0)
.build();
// Create ProducerRecord object
ProducerRecord<String, Stock> rec = new ProducerRecord<>(topicName, stock);
// Send data to the producer (optional callback)
producer.send(rec);
// Call producer flush() and/or close()
producer.flush();
producer.close();
}
}