Janusgraph OLAP query outside gremlin console



>我有一个图表,其中某些节点有数百万条传入的边。我需要定期获取此类节点的边缘计数。我正在使用Cassandar作为存储后端。 查询:

g.V().has('vid','qwerty').inE().count().next()

所有可用的文档都解释了如何利用 apache Spark 从 gremlin 控制台执行此操作。 我是否有可能以某种方式将 gremlin 控制台之外的逻辑编写为 Spark 作业,并在 Hadoop集群上定期运行 id。

这是我不使用 Spark 时在 gremlin 控制台上的查询输出:

14108889 [gremlin-server-session-1] WARN org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor - 根据请求处理脚本的异常 [请求消息{, requestId=c3d902b7-0fdd-491d-8639-546963212474, op='eval', 处理器='会话', args={gremlin=g.V().has('vid','qwerty').inE().count().next(), 会话=2831d264-4566-4d15-99c5-d9bbb202b1f8, 绑定={}, manageTransaction=false, batchSize=64}}]。TimedOutException() at org.apache.cassandra.thrift.Cassandra$multiget_slice_result$multiget_slice_resultStandardScheme.read(Cassandra.java:14696) at org.apache.cassandra.thrift.Cassandra$multiget_slice_result$multiget_slice_resultStandardScheme.read(Cassandra.java:14633) at org.apache.cassandra.thrift.Cassandra$multiget_slice_result.read(Cassandra.java:14559) 在 org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) 在 org.apache.cassandra.thrift.Cassandra$Client.recv_multiget_slice(Cassandra.java:741) 在 org.apache.cassandra.thrift.Cassandra$Client.multiget_slice(Cassandra.java:725) 在 org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftKeyColumnValueStore.getNamesSlice(CassandraThriftKeyColumnValueStore.java:143) 在 org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftKeyColumnValueStore.getSlice(CassandraThriftKeyColumnValueStore.java:100) 在 org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.getSlice(KCVSProxy.java:82) 在 org.janusgraph.diskstorage.keycolumnvalue.cache.ExpirationKCVSCache.getSlice(ExpirationKCVSCache.java:129) 在 org.janusgraph.diskstorage.BackendTransaction$2.call(BackendTransaction.java:288) 在 org.janusgraph.diskstorage.BackendTransaction$2.call(BackendTransaction.java:285) 在 org.janusgraph.diskstorage.util.BackendOperation.executeDirect(BackendOperation.java:69) 在 org.janusgraph.diskstorage.util.BackendOperation.execute(BackendOperation.java:55) 在 org.janusgraph.diskstorage.BackendTransaction.executeRead(BackendTransaction.java:470) 在 org.janusgraph.diskstorage.BackendTransaction.edgeStoreMultiQuery(BackendTransaction.java:285) 在 org.janusgraph.graphdb.database.StandardJanusGraph.edgeMultiQuery(StandardJanusGraph.java:441) 在 org.janusgraph.graphdb.transaction.StandardJanusGraphTx.lambda$executeMultiQuery$3(StandardJanusGraphTx.java:1054) 在 org.janusgraph.graphdb.query.profile.QueryProfiler.profile(QueryProfiler.java:98) 在 org.janusgraph.graphdb.query.profile.QueryProfiler.profile(QueryProfiler.java:90) 在 org.janusgraph.graphdb.transaction.StandardJanusGraphTx.executeMultiQuery(StandardJanusGraphTx.java:1054) 在 org.janusgraph.graphdb.query.vertex.MultiVertexCentricQueryBuilder.execute(MultiVertexCentricQueryBuilder.java:113) 在 org.janusgraph.graphdb.query.vertex.MultiVertexCentricQueryBuilder.edges(MultiVertexCentricQueryBuilder.java:133) 在 org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphVertexStep.initialize(JanusGraphVertexStep.java:95) 在 org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphVertexStep.processNextStart(JanusGraphVertexStep.java:101) 在 org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143) 在 org.apache.tinkerpop.gremlin.process.traversal.step.util.ExpandableStepIterator.hasNext(ExpandableStepIterator.java:42) 在 org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep.processAllStarts(ReducingBarrierStep.java:83) 在 org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep.processNextStart(ReducingBarrierStep.java:113) 在 org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:128) 在 org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:38) 在 org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal.next(DefaultTraversal.java:200) at java_util_Iterator$next.call(未知来源) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48) 在 org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113) 在 org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117) at Script13.run(Script13.groovy:1) at org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine.eval(GremlinGroovyScriptEngine.java:843) atorg.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine.eval(GremlinGroovyScriptEngine.java:548) at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:233) 在 org.apache.tinkerpop.gremlin.groovy.engine.ScriptEngines.eval(ScriptEngines.java:120) 在 org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor.lambda$eval$0(GremlinExecutor.java:290) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

但是g.V().has('vid','qwerty').inE().limit(10000).count().next()工作正常并给出==>10000

下面是使用 SparkGraphComputer 创建图形的 Java 客户端:

public class FollowCountSpark {
private static Graph hgraph;
private static GraphTraversalSource traversalSource;
public static void main(String[] args) {
createHGraph();
System.exit(0);
}
private static void createHGraph() {
hgraph = GraphFactory.open("/resources/jp_spark.properties");
traversalSource = hgraph.traversal().withComputer(SparkGraphComputer.class);
System.out.println("traversalSource = "+traversalSource);
getAllEdgesFromHGraph();
}
static long getAllEdgesFromHGraph(){
try{
GraphTraversal<Vertex, Vertex> allV = traversalSource.V();
GraphTraversal<Vertex, Vertex> gt = allV.has("vid", "supernode");
GraphTraversal<Vertex, Long> c = gt.inE()
//                    .limit(600000)
.count();
long l = c.next();
System.out.println("All edges = "+l);
return l;
}catch (Exception e) {
System.out.println("Error while fetching the edges for : ");
e.printStackTrace();
}
return -1;
}
}

相应的属性文件为:

storage.backend=cassandrathrift
storage.cassandra.keyspace=t_graph
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5
ids.block-size = 100000
storage.batch-loading = true
storage.buffer-size = 1000
# read-cassandra-3.properties
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
#
# JanusGraph Cassandra InputFormat configuration
#
# These properties defines the connection properties which were used while write data to JanusGraph.
janusgraphmr.ioformat.conf.storage.backend=cassandrathrift
# This specifies the hostname & port for Cassandra data store.
#janusgraphmr.ioformat.conf.storage.hostname=10.xx.xx.xx,xx.xx.xx.18,xx.xx.xx.141
janusgraphmr.ioformat.conf.storage.port=9160
# This specifies the keyspace where data is stored.
janusgraphmr.ioformat.conf.storage.cassandra.keyspace=t_graph
#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
spark.cassandra.input.split.size=256
#
# SparkGraphComputer Configuration
#
spark.master=local[1]
spark.executor.memory=1g
spark.cassandra.input.split.size_in_mb=512
spark.executor.extraClassPath=/opt/lib/janusgraph/*
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator

以及所有 Spark 和 Hadoop特定类的相应 pom.xml 依赖项:

<dependencies>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-core</artifactId>
<version>${janusgraph.version}</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-cassandra</artifactId>
<version>${janusgraph.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>spark-gremlin</artifactId>
<version>3.1.0-incubating</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>spark-gremlin</artifactId>
<version>3.2.5</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-hadoop-core</artifactId>
<version>${janusgraph.version}</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-hbase</artifactId>
<version>${janusgraph.version}</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-cql</artifactId>
<version>${janusgraph.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>

希望这对:)有所帮助

最新更新