Kafka Connect启动,但什么也没发生



我正在编写一个基于工作制作人的Kafka源连接器,用于音频文件。连接器启动了,但什么也没发生,没有错误,没有数据,我不确定这是编码问题还是配置问题。

连接器应该读取整个目录,并将文件读取为字节数组。

配置类:

package hothman.example;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import java.util.Map;

public class AudioSourceConnectorConfig extends AbstractConfig {
public static final String FILENAME_CONFIG="fileName";
private static final String FILENAME_DOC ="Enter the path of the audio files";
public static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Enter the topic to write to..";

public AudioSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
public AudioSourceConnectorConfig(Map<String, String> parsedConfig) {
this(conf(), parsedConfig);
}
public static ConfigDef conf() {
return new ConfigDef()
.define(FILENAME_CONFIG, Type.STRING, Importance.HIGH, FILENAME_DOC)
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC);
}
public String getFilenameConfig(){
return this.getString("fileName");
}
public String getTopicConfig(){
return this.getString("topic");
}
}

SourceConnectorClass

package hothman.example;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AudioSourceConnector extends SourceConnector {
/*
Your connector should never use System.out for logging. All of your classes should use slf4j
for logging
*/
private static Logger log = LoggerFactory.getLogger(AudioSourceConnector.class);
private AudioSourceConnectorConfig config;
private String filename;
private String topic;
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> props) {
filename = config.getFilenameConfig();
topic = config.getTopicConfig();
if (topic == null || topic.isEmpty())
throw new ConnectException("AudiSourceConnector configuration must include 'topic' setting");
if (topic.contains(","))
throw new ConnectException("AudioSourceConnector should only have a single topic when used as a source.");
}
@Override
public Class<? extends Task> taskClass() {
//TODO: Return your task implementation.
return AudioSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configsList = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> configs = new HashMap<>();
if (filename != null)
configs.put(config.getFilenameConfig(), filename);
configs.put(config.getTopicConfig(), topic);
configsList.add(configs);
return configsList;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return AudioSourceConnectorConfig.conf();
}
}

SourceTask类

package hothman.example;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import static com.sun.nio.file.ExtendedWatchEventModifier.FILE_TREE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
public class AudioSourceTask extends SourceTask {
/*
Your connector should never use System.out for logging. All of your classes should use slf4j
for logging
*/
static final Logger log = LoggerFactory.getLogger(AudioSourceTask.class);
private AudioSourceConnectorConfig config;
public static final String POSITION_FIELD = "position";
private static final Schema VALUE_SCHEMA = Schema.BYTES_SCHEMA;
private String filename;
private String topic = null;
private int offset = 0;

private FileSystem fs = FileSystems.getDefault();
private WatchService ws = fs.newWatchService();
private Path dir;
private File directoryPath;
private ArrayList<File> listOfFiles;
private byte[] temp = null;

public AudioSourceTask() throws IOException {
}
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> props) {
filename = config.getFilenameConfig();
topic = config.getTopicConfig();
if (topic == null)
throw new ConnectException("AudioSourceTask config missing topic setting");
dir = Paths.get(filename);
try {
dir.register(ws, new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE}, FILE_TREE);
} catch (IOException e) {
e.printStackTrace();
}
directoryPath = new File(String.valueOf(dir));
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
//TODO: Create SourceRecord objects that will be sent the kafka cluster.
listOfFiles = new ArrayList<File>(Arrays.asList(directoryPath.listFiles()));
Map<String, Object> offset = context.offsetStorageReader().
offset(Collections.singletonMap(config.getFilenameConfig(), filename));

ArrayList<SourceRecord> records = new ArrayList<>(1);
try {
for (File file : listOfFiles) {
// send existing files first
temp = Files.readAllBytes(Paths.get(file.toString()));
records.add(new SourceRecord(null,
null, topic, Schema.BYTES_SCHEMA, temp));

}
return records;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public void stop() {
//TODO: Do whatever is required to stop your task.
}


}

版本类

package hothman.example;
/**
* Created by jeremy on 5/3/16.
*/
class VersionUtil {
public static String getVersion() {
try {
return VersionUtil.class.getPackage().getImplementationVersion();
} catch(Exception ex){
return "0.0.0.0";
}
}
}

连接器属性

name=AudioSourceConnector
tasks.max=1
connector.class=hothman.example.AudioSourceConnector

fileName = G:\Files
topic= my-topic

独立连接。属性

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=G:/Kafka/kafka_2.12-2.8.0/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=G:/Kafka/kafka_2.12-2.8.0/plugins

错误:

[2021-05-05 01:24:27,926] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-05 01:24:27,928] ERROR WorkerSourceTask{id=AudioSourceConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
java.lang.OutOfMemoryError: Java heap space
at java.nio.file.Files.read(Files.java:3099)
at java.nio.file.Files.readAllBytes(Files.java:3158)
at hothman.example.AudioSourceTask.poll(AudioSourceTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:273)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-05-05 01:24:27,929] INFO [Producer clientId=connector-producer-AudioSourceConnector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1204)
[2021-05-05 01:24:27,933] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659)
[2021-05-05 01:24:27,934] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663)
[2021-05-05 01:24:27,934] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669)
[2021-05-05 01:24:27,935] INFO App info kafka.producer for connector-producer-AudioSourceConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-05-05 01:24:36,479] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)

使用基于@OneCricketer建议的Logger,我能够确定问题所在。

config.getFilenameConfig();

返回null,所以我不得不暂时在连接器中手动编码路径。

连接器工作,但出现CCD_ 1错误。为了解决这个问题,我必须编辑connect standalone.properties文件,并更改生产者.max.request.size生产商.buffer.memory的大小,并确保它们的值高于我要发送的任何文件。

我还编辑了AudioSourceTask类,去掉了轮询方法中的for循环,并将listOfFiles的初始化从轮询方法移到了启动方法,现在如下

public void start(Map<String, String> props) {

filename = "G:\AudioFiles";//config.getFilenameConfig();//
topic = "voice-wav1";//config.getTopicConfig();//
if (topic == null)
throw new ConnectException("AudioSourceTask config missing topic setting");

dir = Paths.get(filename);
try {
dir.register(ws, new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE}, FILE_TREE);
} catch (IOException e) {
e.printStackTrace();
}

directoryPath = new File(String.valueOf(dir));
listOfFiles = new ArrayList<File>(Arrays.asList(directoryPath.listFiles()));
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
//TODO: Create SourceRecord objects that will be sent the kafka cluster.

Map<String, Object> offset = context.offsetStorageReader().
offset(Collections.singletonMap("G:\AudioFiles", filename));

ArrayList<SourceRecord> records = new ArrayList<>(1);
try{

// send existing files first
if(listOfFiles.size()!=0) {
File file = listOfFiles.get(listOfFiles.size() - 1);
listOfFiles.remove(listOfFiles.size() - 1);
temp = Files.readAllBytes(Paths.get(file.toString()));
records.add(new SourceRecord(null,
null, topic, Schema.BYTES_SCHEMA, temp));
LOGGER.info("Reading file {}", file);
return records;
}

} catch (IOException e) {
e.printStackTrace();
}
return null;
}

相关内容

  • 没有找到相关文章