概括 DynamoDB + JanusGraph Factory:锁定和架构问题



我正在努力推广来自 AWS 的 DynamoDB + JanusGraph 教程,以便给定具有标准约定的标准.txt文件,程序可以摄取数据(作为三重)并创建顶点、属性和边。通常我不会发布这么长的问题,但似乎这些都与我创建的同一类 ObjectCreationCommand 中的 4-5 行有关。

示例 Triple 如下所示:"name:Jim Henson t isCreatorOf t televisionshow:The Muppets"

  1. 左对象:吉姆·汉森
  2. 左侧对象属性:名称
  3. 关系:是创造者
  4. 右对象:布偶
  5. 权利对象属性:电视节目

尽管程序可以编译并运行,但我会抛出几个异常,阻止图形被填充。当我运行工厂程序时,它会读取我所有的三元组并将它们放入哈希集中,但随后发生以下错误(10 次,但这只是 1 个示例):

57338 [pool-10-thread-2] ERROR org.janusgraph.graphdb.database.StandardJanusGraph  - Could not commit transaction [10] due to exception
org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229,  0) when tx 0x181408349015 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

接下来,抛出类似的异常:

57427 [pool-10-thread-10] ERROR com.amazon.janusgraph.example.MarvelGraphFactory  - Error processing line Could not commit transaction due to exception during persistence tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229,  0) when tx 0x181403f69b77 tried to lock
org.janusgraph.core.JanusGraphException: Could not commit transaction due to exception during persistence
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1374)
at org.janusgraph.graphdb.database.management.ManagementSystem.commit(ManagementSystem.java:235)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:59)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.janusgraph.core.JanusGraphException: Unexpected exception
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:798)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1363)
... 6 more
Caused by: org.janusgraph.diskstorage.locking.TemporaryLockingException: tx 0x181404008c7c already locked key-column ( 16-165-160-114-116- 30- 98-114- 97-110-100-116-121-112-229,  0) when tx 0x181403f69b77 tried to lock
at com.amazon.janusgraph.diskstorage.dynamodb.AbstractDynamoDBStore.acquireLock(AbstractDynamoDBStore.java:132)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:155)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore$4.call(MetricInstrumentedStore.java:153)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.runWithMetrics(MetricInstrumentedStore.java:217)
at org.janusgraph.diskstorage.util.MetricInstrumentedStore.acquireLock(MetricInstrumentedStore.java:152)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:52)
at org.janusgraph.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:255)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:565)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:694)
... 7 more

然后抛出与架构相关的异常:

58030 [pool-10-thread-4] ERROR com.amazon.janusgraph.example.MarvelGraphFactory  - Error processing line Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName] 
org.janusgraph.core.SchemaViolationException: Adding this property for key [~T$SchemaName] and value [rtbrandtype] violates a uniqueness constraint [SystemIndex#~T$SchemaName]
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:791)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addProperty(StandardJanusGraphTx.java:720)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makeSchemaVertex(StandardJanusGraphTx.java:847)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.makePropertyKey(StandardJanusGraphTx.java:867)
at org.janusgraph.graphdb.types.StandardPropertyKeyMaker.make(StandardPropertyKeyMaker.java:100)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:47)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

最后,抛出一个我真的不明白的异常:

58512 [pool-10-thread-8] ERROR com.amazon.janusgraph.example.MarvelGraphFactory  - Error processing line Could not find type for id: 11529 
java.lang.IllegalStateException: Could not find type for id: 11529
at com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.name(JanusGraphSchemaVertex.java:59)
at org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex.asIndexType(JanusGraphSchemaVertex.java:177)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndexDirect(ManagementSystem.java:412)
at org.janusgraph.graphdb.database.management.ManagementSystem.getGraphIndex(ManagementSystem.java:422)
at com.amazon.janusgraph.creator.ObjectCreationCommand.run(ObjectCreationCommand.java:55)
at com.amazon.janusgraph.batch.BatchCommand.run(BatchCommand.java:34)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

因为最终事务是 Null,所以抛出一个 NullPointerException,并且事务永远不会提交;因此,我的图形已初始化但为空。

通常我不会发布这么长的问题,但似乎这些都与我创建的同一类 ObjectCreationCommand 中的 4-5 行有关。

ObjectCreationCommand.java

package com.amazon.janusgraph.creator;
import com.amazon.janusgraph.example.TravelGraphFactory;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.JanusGraph;
import com.amazon.janusgraph.triple.Triple;
import org.janusgraph.core.Multiplicity;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.slf4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class ObjectCreationCommand implements Runnable {
public static JanusGraph graph;
private static Triple triple;
private static MetricRegistry REGISTRY;
public static Logger LOG;
private static final String TIMER_LINE = "TravelGraphFactory.line";
private static final String TIMER_CREATE = "TravelGraphFactory.create_";
private static final String COUNTER_GET = "TravelGraphFactory.get_";
public ObjectCreationCommand(JanusGraph graph, Triple triple, MetricRegistry REGISTRY, Logger LOG) {
this.graph = graph;
this.triple = triple;
this.REGISTRY = REGISTRY;
this.LOG = LOG;
}
@Override
public void run() {
JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
mgmt.commit();
long start = System.currentTimeMillis();
String RIGHT_OBJECT_PROPERTY = triple.getRightObjectProperty();
Vertex rightObject = graph.addVertex();
rightObject.property(RIGHT_OBJECT_PROPERTY, triple.getRightObject());
REGISTRY.counter(COUNTER_GET + RIGHT_OBJECT_PROPERTY).inc();
String LEFT_OBJECT_PROPERTY = triple.getLeftObjectProperty();
Vertex leftObject = graph.addVertex();
rightObject.property(LEFT_OBJECT_PROPERTY, triple.getLeftObject());
REGISTRY.counter(COUNTER_GET + LEFT_OBJECT_PROPERTY).inc();
try {
processRelationship(graph, triple);
} catch (Throwable e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
String rootCauseMessage = null == rootCause ? "" : rootCause.getMessage();
LOG.error("Error processing line {} {}", e.getMessage(), rootCauseMessage, e);
}
long end = System.currentTimeMillis();
long time = end - start;
REGISTRY.timer(TIMER_CREATE + RIGHT_OBJECT_PROPERTY).update(time, TimeUnit.MILLISECONDS);
}
private static void processRelationship(JanusGraph graph, Triple triple) {
Vertex left = get(graph, triple.getLeftObjectProperty(), triple.getLeftObject());
if (null == left) {
REGISTRY.counter("error.missingLeftObject." + triple.getLeftObject()).inc();
left = graph.addVertex();
left.property(triple.getLeftObjectProperty(), triple.getLeftObject());
}
Vertex right = get(graph, triple.getRightObjectProperty(), triple.getRightObject());
if (null == right) {
REGISTRY.counter("error.missingRightObject." + triple.getRightObject()).inc();
right = graph.addVertex();
right.property(triple.getRightObjectProperty(), triple.getRightObject());
}
left.addEdge(triple.getRelationship(), right);
}
private static Vertex get(final JanusGraph graph, final String key, final String value) {
final GraphTraversalSource g = graph.traversal();
final Iterator<Vertex> it = g.V().has(key, value);
return it.hasNext() ? it.next() : null;
}
}

上面的异常表明所有错误都来自该类的第 47、55 或 59 行:

JanusGraphManagement mgmt = graph.openManagement();
if (mgmt.getGraphIndex(triple.getRightObjectProperty()) == null) {
[47] final PropertyKey hotelKey = mgmt.makePropertyKey(triple.getRightObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getRightObjectProperty(), Vertex.class).addKey(hotelKey).unique().buildCompositeIndex();
}
if (mgmt.getEdgeLabel(triple.getRelationship()) == null) {
mgmt.makeEdgeLabel(triple.getRelationship()).multiplicity(Multiplicity.MANY2ONE).make();
}
[55] if (mgmt.getGraphIndex(triple.getLeftObjectProperty()) == null) {
final PropertyKey brandTypeKey = mgmt.makePropertyKey(triple.getLeftObjectProperty()).dataType(String.class).make();
mgmt.buildIndex(triple.getLeftObjectProperty(), Vertex.class).addKey(brandTypeKey).unique().buildCompositeIndex();
}
[59] mgmt.commit();

谁能帮助确定我在这门课上做错了什么?无论我做什么,都是锁定表并创建架构问题。

首先,对文本文件中的每一行执行原始 run() 方法。如果做的第一件事是回滚,则在上一次迭代中创建的顶点和边将被吹走。

其次,在下一行,您尝试为每一行设置架构,即使您只需要在实例化图形后立即为图形设置一次架构。这是架构异常的来源,因为索引名称必须是唯一的(并且不一定需要以它们正在索引的属性命名)。

第三,在第 78 行和第 83 行,您假设您正在为处理的文本文件的每一行重新创建顶点。唯一性约束在那里的应用与在第 107 行和第 114 行一样适用,您在处理关系时遵守了唯一性约束。

最后,对于阶数为 ~100 个顶点和 ~100 条边的图形,设置批处理和执行器有点过头了。我提交了一个 PR,它解决了所有这些问题,并提出了两种不同的方法来处理手头的数据加载。

如文档中所述,在尝试创建索引之前,应回滚任何活动事务。 你试过吗?

最新更新