我们正试图在独立的Java主程序(Maven shade可执行文件)中使用Putty 从Accumulo(客户端jar 1.5.0)运行简单的write/sacn,如下所述
public class AccumuloQueryApp {
private static final Logger logger = LoggerFactory.getLogger(AccumuloQueryApp.class);
public static final String INSTANCE = "accumulo"; // miniInstance
public static final String ZOOKEEPERS = "ip-x-x-x-100:2181"; //localhost:28076
private static Connector conn;
static {
// Accumulo
Instance instance = new ZooKeeperInstance(INSTANCE, ZOOKEEPERS);
try {
conn = instance.getConnector("root", new PasswordToken("xxx"));
} catch (Exception e) {
logger.error("Connection", e);
}
}
public static void main(String[] args) throws TableNotFoundException, AccumuloException, AccumuloSecurityException, TableExistsException {
System.out.println("connection with : " + conn.whoami());
BatchWriter writer = conn.createBatchWriter("test", ofBatchWriter());
for (int i = 0; i < 10; i++) {
Mutation m1 = new Mutation(String.valueOf(i));
m1.put("personal_info", "first_name", String.valueOf(i));
m1.put("personal_info", "last_name", String.valueOf(i));
m1.put("personal_info", "phone", "983065281" + i % 2);
m1.put("personal_info", "email", String.valueOf(i));
m1.put("personal_info", "date_of_birth", String.valueOf(i));
m1.put("department_info", "id", String.valueOf(i));
m1.put("department_info", "short_name", String.valueOf(i));
m1.put("department_info", "full_name", String.valueOf(i));
m1.put("organization_info", "id", String.valueOf(i));
m1.put("organization_info", "short_name", String.valueOf(i));
m1.put("organization_info", "full_name", String.valueOf(i));
writer.addMutation(m1);
}
writer.close();
System.out.println("Writing complete ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~`");
Scanner scanner = conn.createScanner("test", new Authorizations());
System.out.println("Step 1 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~`");
scanner.setRange(new Range("3", "7"));
System.out.println("Step 2 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~`");
scanner.forEach(e -> System.out.println("Key: " + e.getKey() + ", Value: " + e.getValue()));
System.out.println("Step 3 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~`");
scanner.close();
}
public static BatchWriterConfig ofBatchWriter() {
//Batch Writer Properties
final int MAX_LATENCY = 1;
final int MAX_MEMORY = 10000000;
final int MAX_WRITE_THREADS = 10;
final int TIMEOUT = 10;
BatchWriterConfig config = new BatchWriterConfig();
config.setMaxLatency(MAX_LATENCY, TimeUnit.MINUTES);
config.setMaxMemory(MAX_MEMORY);
config.setMaxWriteThreads(MAX_WRITE_THREADS);
config.setTimeout(TIMEOUT, TimeUnit.MINUTES);
return config;
}
}
连接已正确建立,但创建BatchWriter时出错,并且正在尝试使用相同的错误
[impl.ThriftScanner] DEBUG: Error getting transport to ip-x-x-x-100:10011 : NotServingTabletException(extent:TKeyExtent(table:21 30, endRow:21 30 3C, prevEndRow:null))
当我们在Spark作业中运行相同的代码(写入Accumulo和读取Accumulo)并提交给YANK集群时,它运行得很完美。我们正在努力弄清楚,但没有得到任何线索。请参阅下面描述的环境
AWS环境上的Cloudera CDH 5.8.2(4个EC2实例作为一个主实例和3个子实例)。
考虑一下私有IP就像
- 材料:x.x.x.100
- Child1:x.x.x.101
- Child2:x.x.x.102
- Child3:x.x.x.103
我们在CDH 中有以下安装
集群(CDH 5.8.2)
- Accumulo 1.6(Tracer未安装,垃圾收集器在Child2,Master在Master,Monitor在child3,Tablet Server在Master)
- HBase
- HDFS(主节点作为名称节点,所有3个子节点作为数据节点)
- 卡夫卡
- Spark
- 纱线(包括MR2)
- ZooKeeper
Hrm,很奇怪它在YARN上使用Spark运行,但作为一个常规的Java应用程序。通常情况下,情况正好相反:)
我将验证独立java应用程序类路径上的JAR是否与Spark on YARN作业以及Accumulo服务器类路径使用的JAR匹配。
如果这没有帮助,试着将log4j级别提高到DEBUG或TRACE,看看是否有什么东西向你袭来。如果你很难理解日志记录的内容,请随时发送电子邮件至user@accumulo.apache.org你肯定会更加关注这个问题。