CassandrSink的Flink作业失败,写入错误



我有两个简单的Flink流作业,它们从Kafka中读取数据,进行一些转换,并将结果放入Cassandra Sink中。他们阅读不同的卡夫卡主题,并保存到不同的卡桑德拉表格中。

当我独自处理这两项工作中的任何一项时,一切都很好。检查点被触发并完成,数据被保存到Cassandra。

但是,当我同时运行两个作业(或其中一个作业运行两次(时,第二个作业在启动时失败,但出现以下异常:CCD_ 1。

我找不到关于这个错误的太多信息,它可能是由以下任何一个引起的:

  • Flink(v1.10.0-scala_2.12(
  • Flink Cassandra连接器(Flink-Connector-Cassandra_2.11:jar:1.10.2,也尝试过使用Flink-coconnector-cassaandra_2.12:jar:11.0.0(
  • 数据税基础驱动程序(v 3.10.2(
  • Cassandra v4.0(与v3.0相同(
  • 净运输(第4.1.51.版最终版(

我也使用可能与第一个发生冲突的包:

  • mysql连接器java(v8.0.19(
  • 卡桑德拉驾驶员附加(v 3.10.2(

最后,这是我为集群构建器编写的代码:

ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
Cluster cluster = null;
try {
cluster = builder
.addContactPoint("localhost")
.withPort(9042)
.withClusterName("Test Cluster")
.withoutJMXReporting()
.withProtocolVersion(ProtocolVersion.V4)
.withoutMetrics()
.build();
// register codecs from datastax extras.
cluster.getConfiguration().getCodecRegistry()
.register(LocalTimeCodec.instance);
} catch (ConfigurationException e) {
e.printStackTrace();
} catch (NoHostAvailableException nhae) {
nhae.printStackTrace();
}
return cluster;
}
};

我尝试了不同的PoolingOptions和SocketOptions设置,但没有成功。

Cassandra水槽:

CassandraSink.addSink(dataRows)
.setQuery("insert into table_name_(16 columns names) " +
"values (16 placeholders);")
.enableWriteAheadLog()
.setClusterBuilder(builder)
.setFailureHandler(new CassandraFailureHandler() {
@Override
public void onFailure(Throwable throwable) {
LOG.error("A {} occurred.", "Cassandra Failure", throwable);
}
})
.build()
.setParallelism(1)
.name("Cassandra Sink For Unique Count every N minutes.");

来自flink作业管理器的完整跟踪日志:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)

感谢您的帮助。

编辑:

  • 我刚刚尝试使用两个Cassandra独立实例(不同的机器和不同的集群(。然后,我将一个作业指向一个实例,另一个作业则指向另一个实例。什么都没有改变,我仍然会犯同样的错误
  • 为了减少依赖关系,下面是新的pom文件:
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.abcde.ai</groupId>
<artifactId>analytics-etl</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.2</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.abcde.analytics.etl.KafkaUniqueCountsStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

编辑:我设法缩小了问题的范围。当我将依赖项flink-connector-cassandra标记为已提供,并简单地将jar文件从本地maven存储库(~/.m2/repository.org/apache/flink/flink-connector-cassandra_2.11/11.2./flink-connector-cassaandra_2.11-10.2.jar(复制到flink lib文件夹时,该错误得到了修复。我的问题解决了,但根本原因仍然是个谜。

我可能错了,但问题很可能是由netty客户端版本冲突引起的。错误状态为NoHostAvailableException,但潜在错误是带有Error writing错误消息的TransportException。卡桑德拉显然运作良好。

有一种类似的stackoverflow情况——Cassandra——错误写入,具有非常相似的症状——单个项目运行良好,AllNodesFailedExceptionTransportException在添加一个项目时,Error writing消息是根本原因。作者能够通过统一棘手的客户来解决这个问题。

在您的情况下,我不确定为什么有这么多依赖项,所以我会尝试去掉所有额外的和库,只留下Flink(1.10.0-scala_2.12(和Flink Cassandra连接器(Flink-Connector-Cassandra_2.12:jar:1.10.0(库。它们必须已经包括必要的驱动程序、netty等。所有其他驱动程序都应该跳过(至少对于初始迭代,以确保这解决了问题及其库冲突(。

为了修复错误,我将依赖项flink-connector-cassandra标记为已提供,我只需将jar文件从本地maven存储库(~/.m2/repository.org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink-connector-cassandra_2.11-10.2.jar(复制到flink lib文件夹并重新启动flink,以下是我的新pom.xml文件:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

我是怎么发现的?我正要尝试从源代码使用更新的驱动程序版本编译连接器。首先,我试图用不变的来源重现错误。所以我在不做任何更改的情况下编译了它,把jar放在Flink lib文件夹中,Hooray它很管用!然后我怀疑来自maven的罐子有什么不同的东西。我把它复制到lib文件夹中,令我惊讶的是它也起作用了。

我的问题解决了,但根本原因仍然是个谜。

我最后一次尝试是检查是否有任何包与Cassandra连接器冲突,所以我运行com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))0,与org.apache.flink:flink-metrics-dropwizard关于metrics-core:有一个冲突

[INFO] +- org.apache.flink:flink-connector-cassandra_2.12:jar:1.10.0:provided
[INFO] |  +- (io.dropwizard.metrics:metrics-core:jar:3.1.2:provided - omitted for conflict with 3.1.5)
[INFO] |  - (org.apache.flink:force-shading:jar:1.10.0:provided - omitted for duplicate)

我从我的项目中删除了这个依赖项,但如果连接器没有标记为已提供,并且也没有放在lib文件夹中,则错误仍然存在。

相关内容

  • 没有找到相关文章

最新更新