我正在尝试设置一个非常基本的示例。
- 将随机数据推送到NiFi中的输出端口
- 使用Spark流上下文打印接收到的数据
版本(全部在单个实例上(
- HDF-3.1.1.0-35
- HDP-2.6.5.0-292
- nifi火花接收器&站点到站点客户端-1.7.1
我已经配置了如下的spark-defaults.conf
spark.driver.extraClassPath /usr/hdf/3.1.1.0-35/nifi/work/META-INF/bundled-dependencies/nifi-client-dto-1.5.0.3.1.1.0-35.jar:/opt/spark-receiver/httpcore-nio-4.0-alpha6.jar:/opt/spark-receiver/nifi-site-to-site-client-1.5.0.3.1.2.0-7.jar:/opt/spark-receiver/nifi-spark-receiver-1.5.0.3.1.2.0-7.jar:/usr/hdf/3.1.1.0-35/nifi/lib/nifi-api-1.5.0.3.1.1.0-35.jar:/usr/hdf/3.1.1.0-35/nifi/lib/bootstrap/nifi-utils-1.5.0.3.1.1.0-35.jar:/usr/hdf/3.1.1.0-35/nifi/lib/nifi-framework-api-1.5.0.3.1.1.0-35.jar
我正在spark shell 中运行以下命令
import org.apache.nifi._
import java.nio.charset._
import org.apache.nifi.spark._
import org.apache.nifi.remote.client._
import org.apache.spark._
import org.apache.nifi.events._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.nifi.remote._
import org.apache.nifi.remote.client._
import org.apache.nifi.remote.protocol._
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import java.io._
import org.apache.spark.serializer._
val conf = new SiteToSiteClient.Builder().url("http://10.140.0.2:9090/nifi").portName("toSpark").buildConfig()
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
val text = lines.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
text.print()
ssc.start()
运行此代码后,我得到以下错误:
线程"NiFi Receiver"java.lang.NoClassDefFoundError中出现异常:org/apache/http/nio/protocol/HttpAsyncResponseConsumer网址:org.apache.nifi.remote.client.SiteInfoProvider.createSiteToSiteRestApiClient(SiteInfoProvider.java:104(网址:org.apache.nifi.remote.client.SiteInfoProvider.refreshRemoteInfo(SiteInfoProvider.java:68(网址:org.apache.nifi.remote.client.SiteInfoProvider.getPortIdentifier(SiteInfoProvider.java:220(网址:org.apache.nifi.remote.client.SiteInfoProvider.getOutputPortIdentifier(SiteInfoProvider.java:204(网址:org.apache.nifi.remote.client.SocketClient.getPortIdentifier(SocketClient.java:79(网址:org.apache.nifi.remote.client.SocketClient.createTransaction(SocketClient.java:121(网址:org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable.run(NiFiReceiver.java:149(在java.lang.Thread.run(Thread.java:748(由以下原因引起:java.lang.ClassNotFoundException:org.apache.http.nio.protocol.HttpAsyncResponseConsumer位于java.net.URLClassLoader.findClass(URLClassLoader.java:381(位于java.lang.ClassLoader.loadClass(ClassLoader.java:424(在sun.mic.Launcher$AppClassLoader.loadClass(Launcher.java:349(在java.lang.ClassLoader.loadClass(ClassLoader.java:357(
请帮忙。
我猜您包含的httpcore-nio-4.0-alpha6.jar是问题所在,因为该版本中没有该类,这似乎干扰了与nifi火花接收器(当前版本4.4.6(过渡包含的httpcore nio版本。