pyzmq REQ/REP with asyncio await for variable



我第一次在python中使用asyncio,并尝试将其与ZMQ结合使用。

基本上我的问题是我有一个 REP/REQ 系统,在一个具有我需要等待的功能的async def中。 如何不更新值。 下面是一段代码来说明这一点:

#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")

我将此对象发送到一个类并在此函数中将其取回

async def readsonar(self, trigger_pin, REP_server_django):
i= 0
while True:
ping_from_view = await REP_server_django.recv()  # line.1
value = await self.board.sonar_read(trigger_pin) # line.2
print(value)                                     # line.3
json_data = json.dumps(value)                    # line.4
#json_data = json.dumps(i)                       # line.4bis
REP_server_django.send(json_data.encode())       # line.5
i+=1                                             # line.6
await asyncio.sleep(1/1000)                      # line.7

sonar_read,是用pymata_express读取超声波传感器。如果我line.2发表评论并line.4我会得到正确的i值。如果我line.1评论并line.5print(value)会从sonar_read打印正确的值.但是,当我如图所示运行它时,value没有更新。

我错过了什么吗?

编辑

:编辑
了有关行注释的类型。我的意思是,如果我只读取声纳并打印值。它工作正常。如果我只.recv().send(json.dumps(i).encode()),它就可以了。但是如果我尝试从声纳发送值。它锁定到未更新的给定value


编辑2:(回答Alan Yorinks):这是MWE,它考虑了您发送的有关班级zmq声明的内容。它取自pymata_express示例concurrent_tasks.py

若要重现此错误,请在两个不同的终端中运行这两个脚本。您将需要一个安装了Frimata_express的 arduino 板。如果一切顺利,PART A.应该只在mve_req.py端吐出相同的值。您可以编辑不同的块(PART A、B 或 C)以查看行为。

mve_rep.py

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress

class ConcurrentTasks:
def __init__(self, board):

self.loop = board.get_event_loop()
self.board = board
self.ctxsync = zmq.Context()
self.context = zmq.asyncio.Context()
self.rep = self.context.socket(zmq.REP)
self.rep.bind("tcp://*:5558")
self.trigger_pin = 53
self.echo_pin = 51
loop.run_until_complete(self.async_init_and_run())
async def readsonar(self):
i = 0
while True:

#PART. A. WHAT I HOPE COULD WORK
rep_recv = await self.rep.recv()                       # line.1
value = await self.board.sonar_read(self.trigger_pin)  # line.2
print(value)                                           # line.3
json_data = json.dumps(value)                          # line.4
# json_data = json.dumps(i)                            # line.4bis
await self.rep.send(json_data.encode())                # line.5
i += 1                                                 # line.6
await asyncio.sleep(1 / 1000)                          # line.7

'''
#PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
value = await self.board.sonar_read(self.trigger_pin)  # line.2
print(value)                                           # line.3
json_data = json.dumps(value)                          # line.4
i += 1                                                 # line.6
await asyncio.sleep(1 / 1000)                          # line.7
'''
'''
#PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
rep_recv = await self.rep.recv()                       # line.1
json_data = json.dumps(i)                              # line.4bis
await self.rep.send(json_data.encode())                # line.5
i += 1                                                 # line.6
await asyncio.sleep(1 / 1000)                          # line.7
'''

async def async_init_and_run(self):
await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
readsonar = asyncio.create_task(self.readsonar())
await readsonar
# OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
my_board = PymataExpress()
try:
ConcurrentTasks(my_board)
except (KeyboardInterrupt, RuntimeError):
loop.run_until_complete(my_board.shutdown())
print('goodbye')
finally:
loop.close()

mve_req.py

import zmq
import time
import json
def start_zmq():
context = zmq.Context()
REQ_django  = context.socket(zmq.REQ)
REQ_django.connect("tcp://localhost:5558")
return REQ_django, context
def get_sonar(REQ_django):
REQ_django.send(b"server_django")
ping_from_server_django = REQ_django.recv()
return ping_from_server_django.decode()
if __name__ == '__main__':
data = {"sensors":{}}
REQ_django, context = start_zmq()
while REQ_django:
data['sensors']['sonar'] = get_sonar(REQ_django)
json_data = json.dumps(data)
print(data)
#DO OTHER WORK
time.sleep(1)
REQ_django.close()
context.term()

完全公开,我是pymata-express和python-banyan的作者。OP 要求我发布此解决方案,因此这并不意味着这是一个无耻的插件。

自从 asyncio 首次在 Python 3 中引入以来,我一直在使用它进行开发。当异步代码工作时,异步(恕我直言)可以简化并发和代码。但是,当出现问题时,调试和了解问题的原因可能会令人沮丧。

我提前道歉,因为这可能有点长,但我需要提供一些背景信息,以便该示例不会看起来像一些随机的代码。

python-banyan 框架的开发是为了提供线程、多处理和异步的替代方案。简而言之,Banyan 应用程序由小型目标可执行文件组成,这些可执行文件使用通过 LAN 共享的协议消息相互通信。它的核心是使用Zeromq。它不是为了让流量通过广域网移动,而是使用LAN作为"软件背板"。在某些方面,Banyan 类似于 MQTT,但在 LAN 中使用时要快得多。如果需要,它确实能够连接到MQTT网络。

榕树的一部分是一个叫做OneGPIO的概念。它是一种协议消息传递规范,它将 GPIO 功能抽象为独立于任何硬件实现。为了实现硬件细节,开发了专门的 Banyan 组件,称为 Banyan 硬件网关。有可用于Raspberry Pi,Arduino,ESP-8266和Adafruit Crickit Hat的网关。GPIO 应用程序发布任何或所有网关可以选择接收的通用 OneGPIO 消息。要从一个硬件平台移动到另一个硬件平台,将启动与硬件关联的网关,并且无需修改,即可启动控制组件(如下所示的代码)。要从一个硬件平台转到另一个硬件平台,无需对任何组件进行代码修改,也不会修改控制组件和网关。启动控制组件时,可以通过命令行选项指定变量,例如引脚编号。对于Arduino网关,pymata-express用于控制Arduino的GPIO。Pymata-express是StandardFirmata客户端的异步实现。需要注意的是,下面的代码不是异步的。Banyan 框架允许人们使用适合问题的工具进行开发,但允许解耦解决方案的各个部分,在这种情况下,该应用程序允许将 asyncio 与非 asyncio 混合在一起,而不会遇到任何通常遇到的麻烦这样做。

在提供的代码中,类定义下的所有代码都用于提供对命令行配置选项的支持。

import argparse
import signal
import sys
import threading
import time
from python_banyan.banyan_base import BanyanBase

class HCSR04(BanyanBase, threading.Thread):
def __init__(self, **kwargs):
"""
kwargs contains the following parameters
:param back_plane_ip_address: If none, the local IP address is used
:param process_name: HCSR04
:param publisher_port: publishing port
:param subscriber_port: subscriber port
:param loop_time: receive loop idle time
:param trigger_pin: GPIO trigger pin number
:param echo_pin: GPIO echo pin number
"""
self.back_plane_ip_address = kwargs['back_plane_ip_address'],
self.process_name = kwargs['process_name']
self.publisher_port = kwargs['publisher_port']
self.subscriber_port = kwargs['subscriber_port'],
self.loop_time = kwargs['loop_time']
self.trigger_pin = kwargs['trigger_pin']
self.echo_pin = kwargs['echo_pin']
self.poll_interval = kwargs['poll_interval']
self.last_distance_value = 0
# initialize the base class
super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
subscriber_port=kwargs['subscriber_port'],
publisher_port=kwargs['publisher_port'],
process_name=kwargs['process_name'],
loop_time=kwargs['loop_time'])
threading.Thread.__init__(self)
self.daemon = True
self.lock = threading.Lock()
# subscribe to receive messages from arduino gateway
self.set_subscriber_topic('from_arduino_gateway')
# enable hc-sr04 in arduino gateway
payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
'echo_pin': self.echo_pin}
self.publish_payload(payload, 'to_arduino_gateway')
# start the thread
self.start()
try:
self.receive_loop()
except KeyboardInterrupt:
self.clean_up()
sys.exit(0)
def incoming_message_processing(self, topic, payload):
print(topic, payload)
with self.lock:
self.last_distance_value = payload['value']
def run(self):
while True:
with self.lock:
distance = self.last_distance_value
payload = {'distance': distance}
topic = 'distance_poll'
self.publish_payload(payload, topic)
time.sleep(self.poll_interval)

def hcsr04():
parser = argparse.ArgumentParser()
# allow user to bypass the IP address auto-discovery.
# This is necessary if the component resides on a computer
# other than the computing running the backplane.
parser.add_argument("-b", dest="back_plane_ip_address", default="None",
help="None or IP address used by Back Plane")
parser.add_argument("-i", dest="poll_interval", default=1.0,
help="Distance polling interval")
parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
help="Set process name in banner")
parser.add_argument("-p", dest="publisher_port", default="43124",
help="Publisher IP port")
parser.add_argument("-s", dest="subscriber_port", default="43125",
help="Subscriber IP port")
parser.add_argument("-t", dest="loop_time", default=".1",
help="Event Loop Timer in seconds")
parser.add_argument("-x", dest="trigger_pin", default="12",
help="Trigger GPIO pin number")
parser.add_argument("-y", dest="echo_pin", default="13",
help="Echo GPIO pin number")
args = parser.parse_args()
if args.back_plane_ip_address == 'None':
args.back_plane_ip_address = None
kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
'publisher_port': args.publisher_port,
'subscriber_port': args.subscriber_port,
'process_name': args.process_name,
'loop_time': float(args.loop_time),
'trigger_pin': int(args.trigger_pin),
'echo_pin': int(args.echo_pin),
'poll_interval': int(args.poll_interval)
}
# replace with the name of your class
HCSR04(**kw_options)

# signal handler function called when Control-C occurs
def signal_handler(sig, frame):
print('Exiting Through Signal Handler')
raise KeyboardInterrupt

# listen for SIGINT
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == '__main__':
hcsr04()

我不确定这是否会解决您的问题,但我确实发现了一些潜在的问题。

  1. 目前尚不清楚如何调用读声纳。
  2. 上下文的创建有一个错别字。
  3. REP_server_django.发送不等待。

以下是我对代码的返工(未经测试):

import asyncio
import zmq
import json

class Play:
def __init__(self):
self.context = zmq.asyncio.Context()
self.REP_server_django = self.context.socket(zmq.REP)
self.REP_server_django.bind("tcp://*:5558")
self.event_loop = asyncio.get_event_loop()
self.event_loop.run_until_complete(self.readsonar(4))
async def readsonar(self, trigger_pin):
i = 0
while True:
ping_from_view = await self.REP_server_django.recv()  # l.1
value = await self.board.sonar_read(trigger_pin)  # l.2
print(value)  # l.3
json_data = json.dumps(value)  # l.4
# json_data = json.dumps(i) # l.4bis
await self.REP_server_django.send(json_data.encode())  # l.5
i += 1  # l.6
await asyncio.sleep(1 / 1000)  # l.6

( O/P MCVE-问题定义进一步蔓延 - 然而,协调问题,无论是否优先,{传感器|参与者}-控制系统,使用分布式自主代理设计的系统在专业上就越复杂,很容易制造有缺陷的"快捷方式"或进入系统范围的阻塞状态

最好至少先阅读这个关于ZeroMQ 在不到五秒的时间内的层次结构和这个关于相互死锁阻塞

阅读神话般的Pieter HINTJENS的书"Code Connected: Volume 1"对任何系统设计人员来说都是巨大的价值)

"...接缝非常有趣,因为它已经实现了异步,所以我可以像我一样添加异步 zmq。我错了吗?

是的,没有">只需添加异步"的快捷方式,控制系统是非常有趣的学科,而是一门复杂的学科。总是。很抱歉不得不直截了当地听到。一些复杂性可能会在教科书示例或琐碎的创客项目中对用户隐藏。然后,锤子开始尝试扩展它们的步骤,只需添加一个或几个更琐碎的功能。复杂性突然浮出水面,这是以前从未见过的。


O/P 多代理[A,B,C,D]系统代码(按原样)的正式映射

将正式映射放在全屏编辑器上,以便查看所有相互冲突的依赖关系和竞争的控制循环的更大图景。延迟是容易的部分。无法解决的死锁阻塞的几个风险是核心问题。ZeroMQ,由于 v2.x 具有避免其中一些的工具,软件设计师有责任适当地缓解所有其他问题。控制系统(机器人或其他)必须证明这种鲁棒性和对错误的弹性,并安全地"生存"所有"外部"事故。

最好的起点是旧的黄金法则,如汇编语言指令在第 1 行中表达的那样:

;ASSUME NOTHING

并努力精心设计其余的。


multi-agent-[A,B,C,D]-system coordination
| | | |
+-|-|-|--------------------- python while   ~ 100 [ms] GIL-lock enforced quota for pure-[SERIAL]-ised code-execution, imposed on all python-threads ( be it voluntarily or involuntarily interruped by the python GIL-lock mechanics, O/S-specific )
+-|-|--------------------- hardware ~  64 - 147 [ms] self.board proxy-driven, responding to python code
+-|--------------------- python asynchronous, strict sequence of remote/local events dependent ZeroMQ dFSA, distributed among local-code operated REP and remote-code operated REQ-side(s) - enforcing a mutually ordered sequence of distributed behaviour as REQ/REP Scalable Formal Communication Archetype Pattern defines
+--------------------- python asyncio.get_event_loop() instantiated another event-loop that may permit to defer an execution(s) of some parts of otherwise imperative python-code to some later time
multi-agent-[A,B,C,D]-system code (as-is)
: : : :
: : : +---------------------------------------------------------+
: : +-----------------------------------------------------------:-------------------+ - - - - - - - - - - - - - - - - -<?network?>- - - - - - - - - - - - - - +
: +-------------------------------------------------------------:----------+        :                                                                         :
:                                                               :          :        :                                                                         :
:                                                               :          :        :                                                                         :
!                                                               :          :        :                                                                         :
____PYTHON___!                                                               :          :        :                                                                         :
!                                                               ?          ?        ?                                                                         ?
+->!                                                              D?         B?       C?REP-1:{0:N}-remote---------------<?network?>------------------------REQ.C? dFSA-state?dependent
^  !                                                              D?         B?       C?REP-1:{0:N}                                                            .C?
^ A!: IMPERATIVE LOOP-HEAD: while True:                           D?AWAIT    B?       C?REP-1:{0:N}-distributed-Finite-State-Automaton (dFSA) BEHAVIOUR, local .C? side depends also on EVOLUTION OF A FUZZY, DYNAMIC, MULTIPARTY, network-wide dFSA-STATE(s) inside such ECOSYSTEM
^  !                                                              D?         B?       C?                                                                        
^  !                                                              D?         B?       C?                    REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
^  !                                                              D?         B?       C?                       vC?                                             ^C?
^  !_______.SET DEFERRED:         P_D?C?_deferred_yield_ping     =D?await ...         C?REP.recv()---<--?---?--vC?-----<--<network>--------<--?remote-REQ-state-C?-( ^C?-dFSA-state && C?.recv()-blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then, just deferred via D?await )
^  !                                                              D?         B?                                vC?                                             ^C?
^  !_______.SET DEFERRED:         S_D?B?_deferred_yield_sonar    =D?await ...B?.board.sonar_read()-o-<--?-+    vC?                                             ^C?
^  !                                                                                               :      |    vC?                                             ^C?
^  !_______.GUI OUTPUT:           print( deferred_yield_sonar )  #A!->-----------------------------+->----?->---:?--->[ a last-known (if any) S_D?B?_deferred_yield_sonar value put "now" on GUI-screen ]
^  !                                                                                               :      ^    vC?                                             ^C?
^  !_______.SET TRANSFORMED:      S_D?B?_dependent_tranformed    =A!json.dumps( S_D?B? )--<--<--<--+      |    vC? <--[ a last-known (if any) S_D?B?_deferred_yield_sonar value transformed and assigned]
^  !                                                                                               :      |    vC?                                             ^C?
^  !_______.BLOCKING-MODE-SEND()  REP.send( S_D?B?_dependent_transformed.encode() )  #C? .send( S_D?B? )--?---->C?-->----<?network?>-->-------?remote-REQ-state-C?-( +C?-indeterministic and blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then )
^  !X:C?                                                                                                  ^    vC?                                             ^C?
^  !X:C?___.SET IMPERATIVE:       i += 1                                                                  | REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
^  !X:C?                                                                                                  ?                                                       
^  !X:C?___.NOP/SLEEP() DEFERRED: await sleep( ... )             #D?AWAIT                                 ^                                                      :
^  !X:C?D?+0ms                                                                                            |                                                      :
^  !X:C?D?_.JUMP/LOOP                                                                                     ?                                                      :
^__!X:C?D?+0ms                                                                                            ^                                                      :
                                              |                                                      :
                                              |                                                      :
                                              |                                                      :
____SONAR___________________________________________________________________________________________________________B? REQUEST T0: + EXPECT ~64 - ~147 [ms] LATENCY        :
                                              B? hardware value acquisition latency can be masked    :
                                                 via await or other concurrency-trick )              :
                                                                                                     :
____REQ-side(s)_?{0:N} __________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
_____REQ-side(s)_?{0:N} _________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
______REQ-side(s)_?{0:N} ________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
_______REQ-side(s)_?{0:N} _______________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
...                                                                                                                                                                 ::: ...
______...REQ-side(s)_?{0:N} _____________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>

正如 O/P 的EDIT :在 2 小时前解释的那样,

问题现在很明显。无限while True:循环指示硬步进,逐行,并再次循环"旋转"所有步骤,一个接一个,而存在的任何asyncioawait装饰函子都被保留下来,异步独立于命令式代码执行的"主">A:while True:循环块。同样,B:self.board设备的外部声纳设备是一个独立的定时设备,在python代码外部,具有一些无法管理的硬件/读取/解码延迟,固定循环的协调+C:ZeroMQ-REQ/REP-原型行为(再次与分散的"外国"REQ进行外部协调 -演员/代理人 - 是的,你无法知道,有多少这样的...... - 但所有这些都超出了您的控制范围,并且都REQ端和本地实例化的REP端分布式有限状态机状态完全独立于"框架"python循环推动步骤前进并执行下一步,下一步,下一步......+ 另一个,这里D:asyncio.get_event_loop()实例化的"第三"event_loop,这会影响await修饰的函子实际上如何推迟以产生结果并在稍后的时间-----交付它们,这就是"交叉面包"的问题-event_loops。

如果这个问题设置是由任何计算机科学教授阐述的,她/他应该起立鼓掌,使这项任务成为分布式系统问题的最佳例子 - 几乎可以作为对玛格丽特·汉密尔顿夫人在正确设计阿波罗AGC计算机系统方面的工作的致敬,她的工作解决了这类问题,从而挽救了船员的生命和登月的所有骄傲, 就在 50 年前。伟大的讲座,汉密尔顿夫人,伟大的讲座。

琐碎,却恰到好处。

确实是一个可爱且科学上奇妙的任务:

设计一个策略,用于一组独立定时和操作的代理的健壮、故障弹性和协调工作,[A, B, C, D],A是一种命令式解释的 python 语言,主要是让 GIL-lock 防止零并发,而是一个纯粹的[SERIAL]流程流,C一组模糊的半持久网络分布式REQ/REP代理,B是一个独立操作的硬件设备,与A可检查的self.board代理有一些有限的I/O接口,并且所有这些都相互独立,并在给定的软件,硬件和网络中物理分布。

硬件诊断+建议的系统架构方法昨天已经提出。如果不测试self.board托管的声纳设备延迟,没有人可以决定下一步的最佳步骤,因为现实的(体内基准测试)硬件响应时间(+最好的还有.board的文档,它的外围传感器设备是否复用?PRIO 驱动或互斥锁或静态、非共享外围设备、寄存器只读抽象、... ?)是决定可能的[A, B, C, D]协调策略的主要因素。


ZeroMQ 部分:

如果您l.5-REP_server_django.send(json_data.encode()) # l.5发表评论,您将进入最终块,因为REQ/REPZeroMQ 可扩展形式通信原型模式的原始严格形式无法再次.recv(),如果它在此之前没有回复REQ端,则在收到第一个.recv()后已.send()

。这是一个简单的问题。


其余的不是可重现的代码。

您可能希望:

  • 验证self.board.sonar_read( trigger_pin )是否收到任何值并测试执行此操作的延迟:

import numpy as np
from zmq import Stopwatch
aClk = Stopwatch()
def sonarBeep():
try:
a_value   = -1
aClk.start()
a_value   = self.board.sonar_read( trigger_pin )
a_time_us = aClk.stop()
except:
try:
aClk.stop()
finally:
a_time_us = -1
finally:
return( a_value, a_time_us )

并运行一系列 100 个声纳测试,以获得有关延迟时间的最小、平均、StDev、MAX读数,所有这些都在[us]因为这些值是主要的,以防某些控制回路被设计为声纳传感器数据。

[ aFun( [ sonarBeep()[1] for _    in range( 100 ) ]
)                for aFun in ( np.min, np.mean, np.std, np.max )
]
系统

架构和子系统协调:

最后但并非最不重要的一点是,可以让在绝对独立的事件循环中读取和存储声纳数据,不与任何其他操作协调,而只是从这样的存储中读取状态变量,被设置在一个独立工作的子系统中(如果不是非常节省电力作为独立的系统行为)

每当人们试图紧密协调独立事件流时(在具有不协调或弱协调代理的分布式系统中最糟糕),设计必须在对错误的鲁棒性和时间错位以及错误恢复能力方面增长。否则,系统可能很快就会在瞬间死锁/活锁。

如果有疑问,可以学习施乐帕洛阿尔托研究中心MVC分离的原始哲学,其中MODEL部分可以(并且在GUI框架中大部分时间,因为198x+确实)接收许多独立于其他系统组件更新的状态变量,这些状态变量只是在需要时读取/使用实际状态变量的数据。同样,如果功率预算允许,SONAR可以连续扫描场景并将读数写入任何本地寄存器,并让其他组件来询问或获得最后一个实际SONAR读数的请求。

ZeroMQ zen-of-zero工作也是如此。

如果这可能有帮助,请检查本地端消息存储的zmq.CONFLATE模式是否以这种方式工作。

一个小注意事项:人们可能已经注意到,sleep( 1 / 1000 )是一个非常昂贵、重复执行的步骤,而且很危险,因为它实际上在 py2.x 中不休眠,因为整数除法。

我让它工作,虽然我不得不承认,我不明白它为什么工作的原因。基本上,我必须创建一个新async def它只轮询sonar_read的读数并使用asyncio.wait返回值。代码如下:

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress

class ConcurrentTasks:
def __init__(self, board):

self.loop = board.get_event_loop()
self.board = board
self.ctxsync = zmq.Context()
self.context = zmq.asyncio.Context()
self.rep = self.context.socket(zmq.REP)
self.rep.bind("tcp://*:5558")
self.trigger_pin = 53
self.echo_pin = 51
loop.run_until_complete(self.async_init_and_run())
### START:  NEW CODE THAT RESOLVED THE ISSUE
async def pingsonar(self):
value = await self.board.sonar_read(self.trigger_pin)
return value
async def readsonar(self):
while True:
rep_recv = await self.rep.recv() 
value = await asyncio.wait([self.pingsonar()])
valuesonar = list(value[0])[0].result()
json_data = json.dumps(valuesonar) 
await self.rep.send(json_data.encode()) 
await asyncio.sleep(1 / 1000) #maybe this line isn't necessary
### END : NEW CODE THAT RESOLVED THE ISSUE
async def async_init_and_run(self):
await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
readsonar = asyncio.create_task(self.readsonar())
await readsonar
# OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
my_board = PymataExpress()
try:
ConcurrentTasks(my_board)
except (KeyboardInterrupt, RuntimeError):
loop.run_until_complete(my_board.shutdown())
print('goodbye')
finally:
loop.close()

尽管如此,我还是感谢您的帮助。

最新更新