我有一个Vagrant镜像,Spark Notebook、Spark、Accumulo 1.6和Hadoop都在运行。在笔记本上,我可以手动创建扫描仪,并从我使用Accumulo示例之一创建的表中提取测试数据:
val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)
scanner.setRange(new Range("row_0000000000", "row_0000000010"))
for(entry: Entry[Key, Value] <- scanner) {
println(entry.getKey + " is " + entry.getValue)
}
将给出表数据的前十行。
当我尝试这样创建RDD时:
val rdd2 =
sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
我收到了一个RDD,由于以下错误,我无法处理它:
java.io.IOException:尚未设置输入信息。在org.apache.accumulo.core.client.mapdreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)在org.apache.accumulo.core.client.maprereduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)在org.apache.accumulo.core.client.maprereduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)在org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadooopRDD.scala:98)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:222)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:220)在scala。Option.getOrElse(Option.scala:120)org.apache.spark.rdd.rdd.partitions(rdd.scala:220)org.apache.spark.SparkContext.runJob(SparkContext.scala:1367)org.apache.spark.rdd.rdd.count(rdd.scala:927)
鉴于我没有指定任何参数来连接哪个表、身份验证是什么等,这完全有意义。
所以我的问题是:我需要从这里做什么才能将表数据的前十行放入RDD
更新一个仍然不起作用,但我确实发现了一些事情。原来有两个几乎相同的包,
org.apache.accumulo.core.client.mapreduce
&
org.apache.accumulo.core.client.mapred
都有几乎相同的成员,只是有些方法签名不同。不知道为什么两者都存在,因为我看不到任何贬低的通知。我不高兴地试着去实现孔子的回答。以下是我所做的,以及回应:
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)
导入org.apache.hadoop.mapred.JobConf导入org.apache.hoop.conf.配置作业conf:org.apache.hadoop.mapred.JobConf=配置:core-default.xml,core-site.xml、mapred-default.xml、mapred-site.xml和yarn-default.xml,yarn-site.xml
配置:core-default.xml、core-site.xml、mapred-default.xml,mapred-site.xml、yarn-default.xml、yan-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
val rdd2 =
sparkContext.hadoopRDD (
jobConf,
classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value],
1
)
rdd2:org.apache.spark.rdd.rdd[(org.apache.accumulo.core.data.Key,org.apache.accumulo.core.data.Value)]=HadoopRDD[1]在:62
rdd2.first
java.io.IOException:尚未设置输入信息。在org.apache.accumulo.core.client.mapdreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)在org.apache.accumulo.core.client.mabred.AbstractInputFormat.validateOptions(AbstractInput Format.java:308)在org.apache.accumulo.core.client.mabred.AbstractInputFormat.getSplits(AbstractInput Format.java:505)网址:org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:222)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:220)在scala。Option.getOrElse(Option.scala:120)org.apache.spark.rdd.rdd.partitions(rdd.scala:220)org.apache.spark.rdd.rdd.take(rdd.scala:1077)org.apache.spark.rdd.rdd.first(rdd.scala:1110)$iwC$$iwC$$iwC$$iwC。(:64)在$iwC$$iwC$$iwC$$iwC。(:69)在…
*编辑2*
回复:Holden的回答-仍然没有喜悦:
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
InputFormatBase.setInputTableName(jobConf, "batchtest1")
val rddX = sparkContext.newAPIHadoopRDD(
jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
rddX:org.apache.spark.rdd.rdd[(org.apache.accumulo.core.data.Key,org.apache.accumulo.core.data.Value)]=NewHadoopRDD[0]newAPIHadoopRDD,电话:58
输出[15]:NewHadoopRDD[0]在newAPIHadoopRD在:58
rddX.first
java.io.IOException:尚未设置输入信息。在org.apache.accumulo.core.client.mapdreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)在org.apache.accumulo.core.client.maprereduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)在org.apache.accumulo.core.client.maprereduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)在org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadooopRDD.scala:98)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:222)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:220)在scala。Option.getOrElse(Option.scala:120)org.apache.spark.rdd.rdd.partitions(rdd.scala:220)org.apache.spark.rdd.rdd.take(rdd.scala:1077)org.apache.spark.rdd.rdd.first(rdd.scala:1110)$iwC$$iwC$$iwC$$iwC。(:61)在
编辑3--进度
我能够弄清楚为什么会出现"输入信息未设置"错误。你们当中目光敏锐的人无疑会看到下面的代码缺少一个结束符
AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")
当我在spark笔记本上做这件事时,我一直在点击执行按钮并继续前进,因为我没有看到错误。我忘记的是,笔记本将做火花壳在你关闭")"时会做的事情——它将永远等待你添加它。因此,该错误是"setConnectorInfo"方法从未执行的结果。
不幸的是,当我执行时,我仍然无法将accumulo表数据推送到对我可用的RDD中
rddX.count
我得到
res15:Long=10000
这是正确的响应-我指向的表中有10000行数据。然而,当我试图这样获取数据的第一个元素时:
rddX.first
我得到以下错误:
org.apache.spark.SparkException:由于阶段失败,作业中止:阶段0.0(TID 0)中的任务0.0具有不可序列化的结果:org.apache.accumulo.core.data.密钥
有什么想法吗?
编辑4--成功
除了accumulo键/值需要转换为可序列化的内容之外,接受的答案+注释达到了90%。我通过在两者上调用.toString()方法来实现这一点。我会尽快发布一些完整的工作代码,以防其他人遇到同样的问题。
通常使用自定义Hadoop输入格式,使用JobConf指定信息。正如@Sietse所指出的,AccumuloInputFormat上有一些静态方法可以用来配置JobConf。在这种情况下,我认为你想做的是:
val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
注意:在深入研究代码后,is configured属性似乎部分是基于被调用的类设置的(这有助于避免与其他包发生冲突),所以当我们稍后将其放回具体类中时,它找不到is configured标志。解决方法是不使用抽象类。看见https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127以获取实现细节)。如果你不能在spark笔记本的具体实现中调用这个方法,那么使用spark shell或定期构建的应用程序可能是最简单的解决方案。
看起来这些参数必须通过静态方法设置:http://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.html.因此,请尝试设置非可选参数,然后再次运行。它应该起作用。