PY4J可从Java Runnable回调



我目前正在尝试使用py4j:

进行以下操作
  • 在python中定义一种调用JVM方法的方法(" executor")
  • 定义python("回调")对象实现JVM接口
  • 给定此回调对象构建一个JVM对象
  • 调用此对象上的方法,该方法将在Java中产生新线程,在回调对象上调用回调,该回调将(在Python侧)执行" executor"方法

这是我在爪哇一侧所拥有的:

package javabridge.test;
public interface PythonCallback {
    Object notify(Object source);
}


package javabridge.test;
public class ScheduledRunnable implements Runnable {
    private PythonCallback callback;
    public ScheduledRunnable(PythonCallback callback) {
        this.callback = callback;
    }
    @Override
    public void run() {
        System.out.println("[ScheduledRunnable] run -> notify");
        this.callback.notify(this);
    }
}


package javabridge.test;
import py4j.GatewayServer;
public class Test {
    private PythonCallback callback;
    public Test(PythonCallback callback) {
        this.callback = callback;
    }
    public void runSynchronous() {
        System.out.println("[runSynchronous] run -> notify");
        this.callback.notify(this);
    }
    public void runAsynchronous() {
        System.out.println("[runAsynchronous] run -> spawn thread");
        ScheduledRunnable runnable = new ScheduledRunnable(callback);
        Thread t = new Thread(runnable);
        t.start();
    }
    public static void main(String[] args) {
        GatewayServer server = new GatewayServer();
        server.start(true);
    }   
}

在Python方面,我有以下脚本:

from py4j.java_gateway import JavaGateway, java_import, get_field, CallbackServerParameters
from py4j.clientserver import ClientServer, JavaParameters, PythonParameters
gateway = JavaGateway(callback_server_parameters=CallbackServerParameters())
#gateway = ClientServer(java_parameters=JavaParameters(), python_parameters=PythonParameters())
java_import(gateway.jvm, 'javabridge.test.*')
class PythonCallbackImpl(object):
    def __init__(self, execfunc):
        self.execfunc = execfunc
    def notify(self, obj):
        print('[PythonCallbackImpl] notified from Java')
        self.execfunc()
        return 'dummy return value'
    class Java:
        implements = ["javabridge.test.PythonCallback"]
def simple_fun():
    print('[simple_fun] called')
    gateway.jvm.System.out.println("[simple_fun] Hello from python!")
# Test 1: Without threading
input('Ready to begin test 1')
python_callback = PythonCallbackImpl(simple_fun)
nothread_executor = gateway.jvm.Test(python_callback)
nothread_executor.runSynchronous()
# Test 2: With threading
input('Ready to begin test 2')
python_callback = PythonCallbackImpl(simple_fun)
nothread_executor = gateway.jvm.Test(python_callback)
nothread_executor.runAsynchronous()
gateway.shutdown()

这是试图执行此脚本的情况。首先,使用gateway = ClientServer(java_parameters=JavaParameters(), python_parameters=PythonParameters()),两个测试失败:

Test 1:
py4j.protocol.Py4JJavaError: An error occurred while calling o0.runSynchronous.
: py4j.Py4JException: Command Part is Empty or is the End of Command Part
        at py4j.Protocol.getObject(Protocol.java:277)
        at py4j.Protocol.getReturnValue(Protocol.java:458)
Test 2:
Exception in thread "Thread-4" py4j.Py4JException: Error while obtaining a new communication channel
        at py4j.CallbackClient.getConnectionLock(CallbackClient.java:218)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:337)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:316)

但是,如果我评论self.execfunc()行,则测试1确实有效而不会增加错误。测试2仍然失败:

Exception in thread "Thread-5" py4j.Py4JException: Error while sending a command.
        at py4j.CallbackClient.sendCommand(CallbackClient.java:357)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:316)

现在切换到gateway = JavaGateway(callback_server_parameters=CallbackServerParameters())。当我保留self.execfunc()评论时,测试2仍然失败:

Exception in thread "Thread-5" py4j.Py4JException: Error while sending a command.
        at py4j.CallbackClient.sendCommand(CallbackClient.java:357)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:316)

但至少测试1确实可以与启用self.execfunc()一起使用。

我的问题是:如何将螺纹方法与self.execfunc()调用使用?PY4J?

是否可以

编辑:为了使事情更加棘手, self.execfunc()调用的java命令应在调用.notify()的同一Java线程中运行。

已解决。原来很简单:

  1. 在Python侧和Java侧也使用Cliendererver!
  2. 请勿致电Gateway.shutdown(),因为这将在接收回调之前断开python(du!)
然后,Java将整齐地遵守预期线程模型,即接收python回调调用的Java命令在执行回调的同一Java线程中执行。

通过简单的Python函数,可以添加shutdown_when_done方法,该方法等待所有回调在退出之前返回。

最新更新