通过RPyC将*args和**kwargs正确传递到底层模块时出现问题



背景

我正在尝试实现APScheduler github存储库中提供的RPyC概念验证示例,以便使用gunicorn与多个工作人员部署我的应用程序(APScheduler的常见问题解答部分指出了这个问题(。此外,我正在尝试使用Flask APScheduler来实现这一点,以便能够从任务函数轻松地在app_contexts中工作。

问题

在调用通过RPyC公开的调度程序服务时,我似乎无法正确地提供参数。更具体地说,当传递参数argskwargs(文字变量(以及存储在公开的RPyC函数的*args**kwargs中的变量时,这似乎成为了一个问题。

基本上,当直接调用scheduler.add_job()时,我通常会使用的参数在通过RPyC路由时不起作用,相反,当试图将RPyC公开的方法接收到的参数传递给底层调度程序实例时,会导致如下错误。我该怎么解决这个问题?

最小,工作示例

在一个终端运行python app.py,在另一个终端运行python scheduler.py

# app.py
from flask import Flask, current_app, jsonify
from flask_apscheduler import APScheduler
import rpyc
scheduler = APScheduler()
def create_app():
app = Flask(__name__)
app.scheduler = rpyc.connect("localhost", 12345)
app.scheduler = app.scheduler.root  # just so current_app.scheduler can be used like normal
@app.route("/add_job/<report_id>", methods=["GET"])
def add_job(report_id):
"""
This works as expected when using 
from app import scheduler
scheduler.add_job(...)
"""
current_app.scheduler.add_job(
func="app.tasks:run_report",
args=(report_id,),
kwargs={"email_results": True},
executor="threadpool",
trigger="cron",
day="*/1",
id="reconcile_accounts"
)
return jsonify({"status": "scheduled"})
return app
if __name__ == "__main__":
app = create_app()
app.run(debug=True)
# scheduler.py
from rpyc.utils.server import ThreadedServer
import rpyc
from app import create_app, scheduler
class SchedulerService(rpyc.Service):
def __init__(self):
self._app = None
self._scheduler = None
def on_connect(self, conn):
# code that runs when a connection is created
# (to init the service, if needed)
self._app = create_app()
self._scheduler = scheduler
def exposed_add_job(self, func, *args, **kwargs):
# Problem occurs below when sending *args and **kwargs to Flask-APScheduler, which sends them to APScheduler
job_id = kwargs.pop("id", None)
return self._scheduler.add_job(job_id, func, *args, **kwargs)
if __name__ == "__main__":
server = ThreadedServer(SchedulerService, port=12345, protocol_config={"allow_public_attrs": True})
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()

self._scheduler.add_job(job_id, func, *args, **kwargs)的跟踪记录

127.0.0.1 - - [08/Jul/2021 10:29:43] "GET /reports/2/run HTTP/1.1" 500 -
Traceback (most recent call last):
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesflaskapp.py", line 2088, in __call__
return self.wsgi_app(environ, start_response)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesflaskapp.py", line 2073, in wsgi_app
response = self.handle_exception(e)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesflaskapp.py", line 2070, in wsgi_app
response = self.full_dispatch_request()
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesflaskapp.py", line 1515, in full_dispatch_request
rv = self.handle_user_exception(e)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesflaskapp.py", line 1513, in full_dispatch_request
rv = self.dispatch_request()
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesflaskapp.py", line 1499, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
File "C:UsersmhillPycharmProjectsreportingappviews.py", line 189, in run_report
kwargs={"config": json.dumps(report.serialize())}
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesrpyccorenetref.py", line 240, in __call__
return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesrpyccorenetref.py", line 63, in syncreq
return conn.sync_request(handler, proxy, *args)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesrpyccoreprotocol.py", line 473, in sync_request
return self.async_request(handler, *args, timeout=timeout).value
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesrpyccoreasync_.py", line 102, in value
raise self._obj
_get_exception_class.<locals>.Derived: dictionary update sequence element #0 has length 6; 2 is required
========= Remote Traceback (1) =========
Traceback (most recent call last):
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesrpyccoreprotocol.py", line 324, in _dispatch_request
res = self._HANDLERS[handler](self, *args)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesrpyccoreprotocol.py", line 592, in _handle_call
return obj(*args, **dict(kwargs))
File "C:/Users/mhill/PycharmProjects/reporting/scheduler.py", line 20, in exposed_add_job
return self._scheduler.add_job(func, *args, **kwargs)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesflask_apschedulerscheduler.py", line 168, in add_job
return self._scheduler.add_job(**job_def)
File "C:UsersmhillPycharmProjectsreportingvenvlibsite-packagesapschedulerschedulersbase.py", line 429, in add_job
'kwargs': dict(kwargs) if kwargs is not None else {},
ValueError: dictionary update sequence element #0 has length 6; 2 is required

根据github上的这个rpyc问题,映射dict的问题可以通过在服务器端和客户端上启用allow_public_attrs来解决。由于默认情况下,rpyc不会公开dict方法来支持迭代,因此**kwargs基本上无法工作,因为kwargs没有可访问的dict方法。

在您的情况下,您只需要像这样更改客户端实例:

app.scheduler = rpyc.connect("localhost", 12345, config={ 'allow_public_attrs': True })

最新更新