Arduino - Raspberry Pi,蓝牙连接使用D-BUS API



任务是基于python脚本,使用D-BUS API实现Arduino和Raspberry Pi之间通过蓝牙的自动配对和连接过程。

连接Arduino的蓝牙模块是:Grove - Serial Bluetooth v3.0。

我能够自动配对过程。配对脚本按顺序执行以下操作:

  1. 查找名为Slave的蓝牙模块通过创建适配器对象并使用StartDiscovery方法。(命名在Arduino中完成)。
  2. 注册蓝牙代理。
  3. 通过Pair创建设备对象和Pair如果设备尚未配对,则使用方法。

完成上述步骤的代码部分如下所示:

register_agent()
start_discovery() 
time.sleep(10) 
for i in range(len(address_list)):
new_dbus_device = get_device(address_list[i])
dev_path = new_dbus_device.object_path
device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
pair_status = device_properties.Get("org.bluez.Device1", "Paired")
if not pair_status:
new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
register_agent()按要求执行:
def register_agent():    
agent = Agent(bus, path)
capability = "NoInputNoOutput"
obj = bus.get_object(BUS_NAME, "/org/bluez");
manager = dbus.Interface(obj, "org.bluez.AgentManager1")
manager.RegisterAgent(path, capability)

但是当我尝试调用Connect时方法在Bluez的device-api中记录,它总是失败。我已经创建了一个自定义的串行端口配置文件并尝试ConnectProfile方法,但再次失败。

如果我使用已弃用的rfcomm工具,则蓝牙通信可以正常工作。如果我使用python套接字模块,它也可以工作。但是,我希望避免使用rfcomm,因为它已被弃用。关于使用python套接字库,连接仅在rfcomm通道1中工作。,其他通道产生Connection Refused错误。

添加绝笔,以便更好地澄清问题:

import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import subprocess
from bluezutils import *
from bluetooth import *
from gi.repository import GObject, GLib
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True) 
path = "/test/agent"
AGENT_INTERFACE = 'org.bluez.Agent1'
BUS_NAME = 'org.bluez'
bus = dbus.SystemBus() 
device_obj = None
dev_path = None
def set_trusted(path2):
props = dbus.Interface(bus.get_object("org.bluez", path2),
"org.freedesktop.DBus.Properties")
props.Set("org.bluez.Device1", "Trusted", True)
class Rejected(dbus.DBusException):
_dbus_error_name = "org.bluez.Error.Rejected"

class Agent(dbus.service.Object):
exit_on_release = True
def set_exit_on_release(self, exit_on_release):
self.exit_on_release = exit_on_release
@dbus.service.method(AGENT_INTERFACE,
in_signature="", out_signature="")
def Release(self):
print("Release")
if self.exit_on_release:
mainloop.quit()
@dbus.service.method(AGENT_INTERFACE,
in_signature="os", out_signature="")
def AuthorizeService(self, device, uuid):
print("AuthorizeService (%s, %s)" % (device, uuid))
return 
@dbus.service.method(AGENT_INTERFACE,
in_signature="o", out_signature="s")
def RequestPinCode(self, device):
set_trusted(device)
return "0000" 
@dbus.service.method(AGENT_INTERFACE,
in_signature="o", out_signature="u")
def RequestPasskey(self, device): 
set_trusted(device)
return dbus.UInt32("0000") 
@dbus.service.method(AGENT_INTERFACE,
in_signature="ou", out_signature="")
def RequestConfirmation(self, device, passkey):
set_trusted(device)
return 

@dbus.service.method(AGENT_INTERFACE,
in_signature="o", out_signature="")
def RequestAuthorization(self, device):
return 
@dbus.service.method(AGENT_INTERFACE,
in_signature="", out_signature="")
def Cancel(self):
print("Cancel")
def pair_reply():
print("Device paired and trusted")
set_trusted(dev_path) 

def pair_error(error):
err_name = error.get_dbus_name()
if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
print("Timed out. Cancelling pairing")
device_obj.CancelPairing()
else:
print("Creating device failed: %s" % (error))
mainloop.quit() 

def register_agent():    
agent = Agent(bus, path)
capability = "NoInputNoOutput"
obj = bus.get_object(BUS_NAME, "/org/bluez");
manager = dbus.Interface(obj, "org.bluez.AgentManager1")
manager.RegisterAgent(path, capability)

def start_discovery():
global pi_adapter
pi_adapter = find_adapter() 
scan_filter = dict({"DuplicateData": False}) 
pi_adapter.SetDiscoveryFilter(scan_filter)
pi_adapter.StartDiscovery()

def stop_discovery():
pi_adapter.StopDiscovery()

def get_device(dev_str):
# use [Service] and [Object path]:
device_proxy_object = bus.get_object("org.bluez","/org/bluez/hci0/dev_"+dev_str)
# use [Interface]:
device1 = dbus.Interface(device_proxy_object,"org.bluez.Device1")
return device1
def char_changer(text):
text = text.replace(':', r'_')
return text
def slave_finder(device_name):

global sublist_normal
sublist_normal = []
sublist= []
address = []
edited_address = None
sub = subprocess.run(["hcitool scan"], text = True, shell = True, capture_output=True)
print(sub.stdout) #string type
sublist = sub.stdout.split()
for i in range(len(sublist)):
if sublist[i] == device_name:
print(sublist[i-1])
sublist_normal.append(sublist[i-1])
edited_address = char_changer(sublist[i-1])
address.append(edited_address)
return address

def remove_all_paired():
for i in range(len(sublist_normal)):
sub = subprocess.run(["bluetoothctl remove " + sublist_normal[i]], text = True, shell = True, capture_output=True)
time.sleep(1)


if __name__ == '__main__':


pair_status = None
address_list = slave_finder('Slave') #Arduino bluetooth module named as "Slave", here we are finding it.
time.sleep(2)
remove_all_paired() #rfcomm requires repairing after release
print(sublist_normal)
if address_list: 
register_agent()
start_discovery() 
time.sleep(10) 
for i in range(len(address_list)):
new_dbus_device = get_device(address_list[i])
dev_path = new_dbus_device.object_path
device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
pair_status = device_properties.Get("org.bluez.Device1", "Paired")
if not pair_status:
new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)


mainloop = GLib.MainLoop()
mainloop.run()

sudo btmon output:

Bluetooth monitor ver 5.50
= Note: Linux version 5.4.83-v7l+ (armv7l)                             0.832473
= Note: Bluetooth subsystem version 2.22                               0.832478
= New Index: DC:A6:32:58:FE:13 (Primary,UART,hci0)              [hci0] 0.832481
= Open Index: DC:A6:32:58:FE:13                                 [hci0] 0.832484
= Index Info: DC:A6:32:5.. (Cypress Semiconductor Corporation)  [hci0] 0.832487
@ MGMT Open: bluetoothd (privileged) version 1.14             {0x0001} 0.832490
@ MGMT Open: btmon (privileged) version 1.14                  {0x0002} 0.832540
所以问题是为什么连接ConnectProfile方法总是失败,我需要做些什么来建立基于Arduino和树莓派之间的D-BUS API的蓝牙通信?

我想你已经回答了自己的问题。Connect不能工作的原因是因为您没有在Raspberry Pi上运行串行端口配置文件(SPP)。如果您运行btmon,您将看到客户端断开连接,因为没有与Arduino提供的匹配的配置文件。

Python Socket连接中使用的端口号需要与Arduino蓝牙模块上的端口号匹配。这可能就是为什么只有1工作的原因。

作为参考,我用树莓派和HC-06测试了这个。我删除了所有的扫描代码,试图得到最小的可运行代码。下面是我使用的代码:

import socket
from time import sleep
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
BUS_NAME = 'org.bluez'
AGENT_IFACE = 'org.bluez.Agent1'
AGNT_MNGR_IFACE = 'org.bluez.AgentManager1'
ADAPTER_IFACE = 'org.bluez.Adapter1'
AGENT_PATH = '/ukBaz/bluezero/agent'
AGNT_MNGR_PATH = '/org/bluez'
DEVICE_IFACE = 'org.bluez.Device1'
CAPABILITY = 'KeyboardDisplay'
my_adapter_address = '11:22:33:44:55:66'
my_device_path = '/org/bluez/hci0/dev_00_00_12_34_56_78'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()

class Agent(dbus.service.Object):
@dbus.service.method(AGENT_IFACE,
in_signature='o', out_signature='s')
def RequestPinCode(self, device):
print(f'RequestPinCode {device}')
return '0000'

class Device:
def __init__(self, device_path):
dev_obj = bus.get_object(BUS_NAME, device_path)
self.methods = dbus.Interface(dev_obj, DEVICE_IFACE)
self.props = dbus.Interface(dev_obj, dbus.PROPERTIES_IFACE)
self._port = 1
self._client_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM)
def connect(self):
# self.methods.Connect()
self._client_sock.bind((my_adapter_address, self._port))
self._client_sock.connect((self.address, self._port))
def disconnect(self):
self.methods.Disconnect()
def pair(self):
self.methods.Pair(reply_handler=self._pair_reply,
error_handler=self._pair_error)
def _pair_reply(self):
print(f'Device trusted={self.trusted}, connected={self.connected}')
self.trusted = True
print(f'Device trusted={self.trusted}, connected={self.connected}')
while self.connected:
sleep(0.5)
self.connect()
print('Successfully paired and connected')
def _pair_error(self, error):
err_name = error.get_dbus_name()
print(f'Creating device failed: {err_name}')
@property
def trusted(self):
return bool(self.props.Get(DEVICE_IFACE, 'Trusted'))
@trusted.setter
def trusted(self, value):
self.props.Set(DEVICE_IFACE, 'Trusted', bool(value))
@property
def paired(self):
return bool(self.props.Get(DEVICE_IFACE, 'Paired'))
@property
def connected(self):
return bool(self.props.Get(DEVICE_IFACE, 'Connected'))
@property
def address(self):
return str(self.props.Get(DEVICE_IFACE, 'Address'))

if __name__ == '__main__':
agent = Agent(bus, AGENT_PATH)
agnt_mngr = dbus.Interface(bus.get_object(BUS_NAME, AGNT_MNGR_PATH),
AGNT_MNGR_IFACE)
agnt_mngr.RegisterAgent(AGENT_PATH, CAPABILITY)
device = Device(my_device_path)
if device.paired:
device.connect()
else:
device.pair()

mainloop = GLib.MainLoop()
try:
mainloop.run()
except KeyboardInterrupt:
agnt_mngr.UnregisterAgent(AGENT_PATH)
mainloop.quit()

相关内容

  • 没有找到相关文章

最新更新