Neo4j 在用户定义的过程中流式传输



当使用Neo4j非托管扩展时,可以在像这样遍历图时将结果流式传输到客户端(在Scala中):

import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
val stream: StreamingOutput = ???
Response.ok().entity(stream).`type`(MediaType.APPLICATION_JSON).build()

在使用 Neo4j 3 使用定义的存储过程时,我找不到类似的可能性。他们返回 Java 8 Stream s,但我看不出如何在这些流已经被并行使用时向这些流添加元素。

可能吗?

我在其中一个APOC程序中有一个示例。

https://github.com/neo4j-contrib/neo4j-apoc-procedures/blob/master/src/main/java/apoc/cypher/Cypher.java#L77

我想在将来添加更多/更一般的例子。

这是我基于Michael Hunger代码(在Scala中)提出的。

QueueBasedSpliterator

import java.util.Spliterator
import java.util.concurrent.{BlockingQueue, TimeUnit}
import java.util.function.Consumer
import org.neo4j.kernel.api.KernelTransaction
private class QueueBasedSpliterator[T](queue: BlockingQueue[T],
                                       tombstone: T,
                                       tx: KernelTransaction) extends Spliterator[T] {
  override def tryAdvance(action: Consumer[_ >: T]): Boolean =
    try {
      if (tx.shouldBeTerminated()) false
      else {
        val entry = queue.poll(100, TimeUnit.MILLISECONDS)
        if (entry == null || entry == tombstone) false
        else {
          action.accept(entry)
          true
        }
      }
    } catch {
      case e: InterruptedException => false
    }
  override def trySplit(): Spliterator[T] = null
  override def estimateSize(): Long = Long.MaxValue
  override def characteristics(): Int = Spliterator.ORDERED | Spliterator.NONNULL
}

请注意 100 毫秒超时值。可能需要调整。

结果流(阻塞队列的包装器):

import java.util.concurrent.BlockingQueue
class ResultsStream[T](tombstone: T, queue: BlockingQueue[T]) extends AutoCloseable {
  def put(t: T): Unit = {
    queue.put(t)
  }
  override def close(): Unit = {
    put(tombstone)
  }
}

CommonUtil 帮助程序方法:

import java.util.concurrent.ArrayBlockingQueue
import java.util.stream.{Stream, StreamSupport}
import org.neo4j.kernel.api.KernelTransaction
import org.neo4j.kernel.internal.GraphDatabaseAPI
import scala.concurrent.{ExecutionContext, Future}
object CommonUtil {
  def inTx(db: GraphDatabaseAPI)(f: => Unit): Unit =
    Managed(db.beginTx()) { tx => f; tx.success() }
  def inTxFuture(db: GraphDatabaseAPI)(f: => Unit)(implicit ec: ExecutionContext): Future[Unit] =
    Future(inTx(db)(f))
  def streamResults[T](tombstone: T, tx: KernelTransaction)
                      (f: ResultsStream[T] => Any): Stream[T] = {
    val queue = new ArrayBlockingQueue[T](100)
    f(new ResultsStream(tombstone, queue))
    StreamSupport.stream(new QueueBasedSpliterator[T](queue, tombstone, tx), false)
  }
}

更多帮助程序:

object Managed {
  type AutoCloseableView[T] = T => AutoCloseable
  def apply[T : AutoCloseableView, V](resource: T)(op: T => V): V =
    try {
      op(resource)
    } finally {
      resource.close()
    }
}

游泳池

import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
object Pool {
  lazy val DefaultExecutionContent: ExecutionContextExecutor =
    ExecutionContext.fromExecutor(createDefaultExecutor())
  // values might be tuned in production
  def createDefaultExecutor(corePoolSize: Int = Runtime.getRuntime.availableProcessors() * 2,
                            keepAliveSeconds: Int = 30) = {
    val queueSize = corePoolSize * 25
    new ThreadPoolExecutor(
      corePoolSize / 2,
      corePoolSize,
      keepAliveSeconds.toLong,
      TimeUnit.SECONDS,
      new ArrayBlockingQueue[Runnable](queueSize),
      new ThreadPoolExecutor.CallerRunsPolicy()
    )
  }
}

过程中的用法

@Procedure("example.readStream")
def readStream(@Name("nodeId") nodeId: NodeId): Stream[StreamingItem] =
  CommonUtil.streamResults(StreamingItem.Tombstone, kernelTx) { results =>
    CommonUtil.inTxFuture(db) { // uses Pool.DefaultExecutionContent
      Managed(results) { _ =>
        graphUtil.findTreeNode(nodeId).foreach { node =>
          // add elements to the stream here
          results.put(???)
        }
      }
    }
  }

StreamingItem.Tombstone只是一个静态StreamingItem实例,具有关闭流的特殊含义。 dbkernelTx只是 Neo4j 设置的上下文变量:

  @Context
  public GraphDatabaseAPI db;
  @Context
  public KernelTransaction kernelTx;

最新更新