通过spark笔记本填充accumulo 1.6突变对象时出现奇怪错误



使用spark笔记本更新accumulo表。采用accumulo文档和accumuloexample代码中指定的方法。以下是我在笔记本上逐字记录的内容,以及回复:

val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);

clientRqrdTable:org.apache.accumulo.core.cli.ClientRequiredTable=org.apache.accumulo.core.cli.ClientOnRequiredTable@6c6a18edbwConfig:org.apache.accumulo.core.client.BatchWriterConfig=[maxMemory=52428800,maxLatency=120000,maxWriteThreads=3,timeout=9223372036854775807]批处理写入程序:org.apache.accumulo.core.client.BatchWriter=org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736

val rowIdS = rddX2_first._1.split(" ")(0)

rowIdS:字符串=row_0736460000

val mutation = new Mutation(new Text(rowIdS))

突变:org.apache.accumulo.core.data.mutation=org.apache.accumulo.core.data.Mutation@0

mutation.put(
  new Text("foo"), 
  new Text("1"), 
  new ColumnVisibility("exampleVis"), 
  new Value(new String("CHEWBACCA!").getBytes) )

java.lang.IllegalStateException:无法在之后添加到突变将其序列化在org.apache.accumulo.core.data.Mutation.put(Mutation.java:168)org.apache.accumulo.core.data.Mutation.put(Mutation.java:163)org.apache.accumulo.core.data.Mutation.put(突变.java:211)

我深入研究了代码,发现罪魁祸首是一个if catch,它正在检查UnsynchronizedBuffer.Writer缓冲区是否为null。行号不会对齐,因为这是一个与1.6 accumulo core jar中的版本略有不同的版本——我已经看了这两个版本,在这种情况下,区别并没有什么不同。据我所知,对象是在执行该方法之前创建的,并没有被转储。

所以要么我在代码中遗漏了什么,要么出现了其他东西。你们中有人知道是什么导致了这种行为吗?

更新一

我已经使用scala控制台并通过直接的java1.8执行了以下代码。它在scala中失败,但在Java中没有。在这一点上,我认为这是一个Accumulo问题。因此,我将打开一个bug票证,并深入挖掘源代码。如果我拿出一个决议,我会在这里发布。

下面是Java形式的代码。其中有一些额外的东西,因为我想确保我可以连接到我使用accumulo批处理编写器创建的表。示例:

import java.util.Map.Entry;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
public class App {
    public static void main( String[] args ) throws 
                                            AccumuloException, 
                                            AccumuloSecurityException, 
                                            TableNotFoundException {
        // connect to accumulo using a scanner
        // print first ten rows of a given table
        String instanceNameS = "accumulo";
        String zooServersS = "localhost:2181";
        Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
        Connector connector = 
                instance.getConnector( "root", new PasswordToken("password"));
        Authorizations auths = new Authorizations("exampleVis");
        Scanner scanner = connector.createScanner("batchtestY", auths);
        scanner.setRange(new Range("row_0000000001", "row_0000000010"));
        for(Entry<Key, Value> entry : scanner) {
          System.out.println(entry.getKey() + " is " + entry.getValue());
        }

        // stage up connection info objects for serialization
        ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
        BatchWriterConfig bwConfig = new BatchWriterConfig();
        BatchWriter batchWriter = 
                connector.createBatchWriter("batchtestY", bwConfig);
        // create mutation object
        Mutation mutation = new Mutation(new Text("row_0000000001"));
        // populate mutation object
        // -->THIS IS WHAT'S FAILING IN SCALA<--
        mutation.put(
                  new Text("foo"), 
                  new Text("1"), 
                  new ColumnVisibility("exampleVis"), 
                  new Value(new String("CHEWBACCA!").getBytes()) );                                           
    }
}

更新两个

已为此问题创建了Accumulo错误票证。他们的目标是在v1.7.0中修复这个问题。在此之前,我在下面提供的解决方案是一个功能性的解决方案。

当执行新的突变单元时,spark笔记本中发生的一切看起来都是在序列化突变。在突变被序列化之后,你不能调用put。我会尝试将mutation.put调用添加到与新的mutation命令相同的笔记本单元格中。看起来clientRqrdTable/bwConfig/batchWriter命令在一个多行单元格中,所以希望这也能用于突变。

因此,与java完美配合的代码似乎与Scala不太配合。解决方案(不一定是一个好的解决方案,而是一个有效的解决方案)是在一个自包含的jar中创建一个java方法,该方法创建突变对象并返回它。这样,您就可以将jar添加到spark的类路径中,并调用所需的方法ass。使用spark笔记本进行了测试,并成功更新了现有的accumulo表。我仍然会向accumulo peeps提交一张罚单,因为这种变通方式不应该被视为"最佳实践"。

相关内容

  • 没有找到相关文章

最新更新