简介:
首先,请允许我为我的问题中的任何含糊不清表示歉意,我将尝试提供有关此主题的尽可能多的信息(希望不会太多(,并且请告诉我是否应该提供更多。同样,我对卡夫卡很陌生,可能会在术语上绊倒。
因此,根据我对接收器和源代码如何工作的理解,我可以使用 Kafka 快速入门指南提供的 FileStreamSourceConnector 将数据(Neo4j 命令(写入 Kafka 集群中保存的主题。然后,我可以编写自己的 Neo4j 接收器连接器和任务来读取这些命令并将它们发送到一个或多个 Neo4j 服务器。为了使项目尽可能简单,现在,我基于 Kafka 快速入门指南的 FileStreamSinkConnector 和 FileStreamSinkTask 来构建接收器连接器和任务。
Kafka's FileStream:
文件流源连接器
文件流源任务
文件流接收器连接器
文件流接收器任务
我的 Neo4j 接收器连接器:
package neo4k.sink;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Neo4jSinkConnector extends SinkConnector {
public enum Keys {
;
static final String URI = "uri";
static final String USER = "user";
static final String PASS = "pass";
static final String LOG = "log";
}
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(Keys.URI, Type.STRING, "", Importance.HIGH, "Neo4j URI")
.define(Keys.USER, Type.STRING, "", Importance.MEDIUM, "User Auth")
.define(Keys.PASS, Type.STRING, "", Importance.MEDIUM, "Pass Auth")
.define(Keys.LOG, Type.STRING, "./neoj4sinkconnecterlog.txt", Importance.LOW, "Log File");
private String uri;
private String user;
private String pass;
private String logFile;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> props) {
uri = props.get(Keys.URI);
user = props.get(Keys.USER);
pass = props.get(Keys.PASS);
logFile = props.get(Keys.LOG);
}
@Override
public Class<? extends Task> taskClass() {
return Neo4jSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>();
if (uri != null)
config.put(Keys.URI, uri);
if (user != null)
config.put(Keys.USER, user);
if (pass != null)
config.put(Keys.PASS, pass);
if (logFile != null)
config.put(Keys.LOG, logFile);
configs.add(config);
}
return configs;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
我的 Neo4j 接收器任务:
package neo4k.sink;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
public class Neo4jSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(Neo4jSinkTask.class);
private String uri;
private String user;
private String pass;
private String logFile;
private Driver driver;
private Session session;
public Neo4jSinkTask() {
}
@Override
public String version() {
return new Neo4jSinkConnector().version();
}
@Override
public void start(Map<String, String> props) {
uri = props.get(Neo4jSinkConnector.Keys.URI);
user = props.get(Neo4jSinkConnector.Keys.USER);
pass = props.get(Neo4jSinkConnector.Keys.PASS);
logFile = props.get(Neo4jSinkConnector.Keys.LOG);
driver = null;
session = null;
try {
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, pass));
session = driver.session();
} catch (Neo4jException ex) {
log.trace(ex.getMessage(), logFilename());
}
}
@Override
public void put(Collection<SinkRecord> sinkRecords) {
StatementResult result;
for (SinkRecord record : sinkRecords) {
result = session.run(record.value().toString());
log.trace(result.toString(), logFilename());
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void stop() {
if (session != null)
session.close();
if (driver != null)
driver.close();
}
private String logFilename() {
return logFile == null ? "stdout" : logFile;
}
}
问题:
写完之后,我接下来构建了一个jar(或Uber Jar?这是一个文件(。然后,我在connect-standalone.properties中编辑了插件路径以包含该工件,并为我的Neo4j接收器连接器编写了一个属性文件。我这样做是为了遵循这些准则。
我的 Neo4j 接收器连接器属性文件:
name=neo4k-sink
connector.class=neo4k.sink.Neo4jSinkConnector
tasks.max=1
uri=bolt://localhost:7687
user=neo4j
pass=Hunter2
topics=connect-test
但是在运行独立运行时,我在关闭流的输出中收到此错误(第 5 行错误(:
[2017-08-14 12:59:00,150] INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-08-14 12:59:00,150] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-08-14 12:59:00,153] INFO Source task WorkerSourceTask{id=local-file-source-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
[2017-08-14 12:59:00,153] INFO Created connector local-file-source (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-08-14 12:59:00,153] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.IllegalArgumentException: Malformed uxxxx encoding.
at java.util.Properties.loadConvert(Properties.java:574)
at java.util.Properties.load0(Properties.java:390)
at java.util.Properties.load(Properties.java:341)
at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:429)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:84)
[2017-08-14 12:59:00,156] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2017-08-14 12:59:00,156] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-08-14 12:59:00,168] INFO Stopped ServerConnector@540accf4{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2017-08-14 12:59:00,173] INFO Stopped o.e.j.s.ServletContextHandler@6d548d27{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
编辑:我应该提到,在连接器加载的部分,输出声明了已添加的插件,我没有看到任何提及我之前构建的 jar 并在 connect-standalone.properties 中创建了路径。下面是上下文的代码片段:
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,970] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
结论:
我很茫然,我已经做了大约几个小时的测试和研究,我认为我不确定要问什么问题。所以如果你已经走到了这一步,我会说谢谢你的阅读。如果您注意到我在代码或方法(例如打包 jar(中可能做错了任何明显的事情,或者认为我应该提供更多上下文或控制台日志或任何真正让我知道的事情。再次感谢您。
正如 @Randall Hauch 所指出的,我的属性文件中隐藏了字符,因为它是一个富文本文档。我通过复制 Kafka 提供的 connect-file-sink.properties 文件来解决此问题,我相信它只是一个常规的文本文档。然后重命名和编辑我的 neo4j 接收器属性的副本。