我找到了一种在Python中使用游戏手柄的方法,使用模块Evdev(见最后的链接)。在本教程中,作者只使用一个游戏手柄,但他也指出,应该可以使用多个游戏手柄,其代码基于以下内容:
from evdev import InputDevice
from select import select
gamepad = InputDevice('/dev/input/event0')
while True:
r,w,x = select([gamepad], [], [])
for event in gamepad.read():
print(event)
Select.select 似乎会等到按下按钮,以便程序中断,直到发生这种情况。如何修改代码以使用多个游戏手柄或在等待按钮输入时执行其他代码?或者有没有更好的选择来使用 evdev 来解决这个问题?
http://ericgoebelbecker.com/2015/06/raspberry-pi-and-gamepad-programming-part-1-reading-the-device/
如何修改代码以使用多个游戏手柄或在等待按钮输入时执行其他代码?
查看文档以了解InputDevice.read
read()
从设备读取多个输入事件。返回生成InputEvent
实例的生成器对象。如果当前没有可用事件,则引发BlockingIOError
。
选择将阻止,直到输入事件可用。相反,我们可以读取事件,直到我们得到BlockingIOError。然后继续下一个游戏手柄,或执行需要在主循环中完成的任何其他工作。
您也可以考虑使用InputDevice.read_one
read_one()
读取并返回单个输入事件作为InputEvent
的实例。如果没有挂起的输入事件,则返回
None
。
我也有这个需求(我有两个BT游戏手柄)。因此,我编写了以下代码:
- 创建一个线程 (
threading.Thread()
),每秒检查它们是否连接到预设的事件路径/dev/input/event*
(16 和 20 是默认值,可以更改)。如果找不到它们,它会尝试下一个事件(例如:17/21)。如果它找到一个或两个,它会连接它/它们。如果没有,它会每秒继续检查。 - 为了连接多个游戏手柄,它使用
selector
如文档所述。
你可以在这里看到我的代码,但我建议你去我的Github进行更新。
点击展开#!/usr/bin/env python3
import time, argparse, subprocess
try:
from obs_api import client, consola, c2, c3, c, thread
except:
from obs_api_no_obs import client, consola, c2, c3, c, thread
import sys
from evdev import InputDevice, ecodes
from selectors import DefaultSelector, EVENT_READ
class Bt:
def __init__(self):
self.but = [307, 308, 305, 304, 315]
self.gamepad1, self.gamepad2 = None, None
self.selector = DefaultSelector()
self.devices_list = list()
self.devices_dict = dict()
self.bt_on = True
def bt_send_hat(self, path, que, val):
client.send_message('/bt', [int(path[-2:]), que, val])
c2(f'/bt, {int(path[-2:])}, {que}, {val}')
if val == 0: self.devices_dict[path] = 'c'
else: self.devices_dict[path] = que
def bt_send(self, path, que, val):
client.send_message('/bt', [int(path[-2:]), que, val])
c2(f'/bt, {int(path[-2:])}, {que}, {val}')
def reconnect(self):
device1 = '13:57:90:05:0E:31'
device2 = '13:6E:0E:07:0E:31'
ps1 = subprocess.Popen(['bluetoothctl', 'info', device1], stdout=subprocess.PIPE)
ps2 = subprocess.Popen(['bluetoothctl', 'info', device2], stdout=subprocess.PIPE)
stdout1 = subprocess.check_output(['grep', 'Connected'], stdin=ps1.stdout).decode("utf-8")
stdout2 = subprocess.check_output(['grep', 'Connected'], stdin=ps2.stdout).decode("utf-8")
if 'No' in stdout1 or 'no' in stdout1:
subprocess.Popen(['bluetoothctl', 'connect', device1])
c3(f'bluetoothctl connect {device1}')
if 'No' in stdout2 or 'no' in stdout2:
subprocess.Popen(['bluetoothctl', 'connect', device2])
c3(f'bluetoothctl connect {device2}')
def is_none(self, num, dev):
gamepad = f'gamepad{num}'
device = f'/dev/input/event{dev}'
try:
vars(self)[gamepad] = InputDevice(device)
try:
self.selector.unregister(vars(self)[gamepad])
except:
c3(f'Todavía no registrado {device}', c.azul)
try:
self.selector.register(vars(self)[gamepad], EVENT_READ)
c3(f'Registrado {device}', c.cian)
except:
c3(f'{device} already registred', c.cian)
except OSError as e:
c3(f'No está conectado {device}')
# Probando device + 1
dev += 1
device = f'/dev/input/event{dev}'
try:
vars(self)[gamepad] = InputDevice(device)
try: self.selector.unregister(vars(self)[gamepad])
except: c3(f'Todavía no registrado {device}', c.azul)
try:
self.selector.register(vars(self)[gamepad], EVENT_READ)
c3(f'Registrado {device}', c.cian)
except: c3(f'{device} already registred', c.cian)
except OSError as e:
c3(f'Ni tampoco... {device}')
def check_devices(self):
while self.bt_on:
# Si no están cargados, los intenta cargar y registrarlos en selector
if self.gamepad1 is None:
self.is_none(1, self.devices_list[0])
if self.gamepad2 is None:
self.is_none(2, self.devices_list[1])
time.sleep(1)
def input_bt(self, gp1, gp2):
self.devices_list = [gp1, gp2]
self.devices_dict= {f'/dev/input/event{gp1}':'c',
f'/dev/input/event{gp2}':'c'}
client.send_message('/bt_init', [gp1, gp2])
thread(self.check_devices)
time.sleep(2)
while self.bt_on:
# Si ninguno de los dos está cargado, vuelve a intentar conectarlos
if self.gamepad1 is None and self.gamepad2 is None:
c3('No está conectado ninguno')
time.sleep(1)
continue
# Revisa la lista de selector, esperando que llegue algo
for key, mask in self.selector.select():
device = key.fileobj
path = key.fileobj.path
# Intenta leer en device. Si salta error...
try:
for event in device.read():
et, ec, ev = event.type, event.code, event.value
if et == ecodes.EV_ABS:
# Analogo
if ec == 1: self.bt_send(path, 'h', -ev)
if ec == 0: self.bt_send(path, 'v', -ev)
if ec == 16 and ev == -1: self.bt_send_hat(path, 't', 1)
elif ec == 16 and ev == 1: self.bt_send_hat(path, 'b', 1)
elif ec == 17 and ev == -1: self.bt_send_hat(path, 'r', 1)
elif ec == 17 and ev == 1: self.bt_send_hat(path, 'l', 1)
if ec == 1 and ev == 0: self.bt_send_hat(path, 'r', 0)
if ec == 1 and ev == 0: self.bt_send_hat(path, 'l', 0)
if ec == 0 and ev == 0: self.bt_send_hat(path, 't', 0)
if ec == 0 and ev == 0: self.bt_send_hat(path, 'b', 0)
if et == ecodes.EV_KEY:
if ec == self.but[0]: self.bt_send(path, 0, ev)
elif ec == self.but[1]: self.bt_send(path, 1, ev)
elif ec == self.but[2]: self.bt_send(path, 2, ev)
elif ec == self.but[3]: self.bt_send(path, 3, ev)
elif ec == self.but[4]: self.bt_send(path, 4, ev)
# ... es porque el gamepad se apagó. Lo cierra y lo desregistra de selector
except OSError as e:
device.close()
c3('input_bt() - Except - Se apagó un gamepad')
if path[-2:] == '16':
c3(f'¿Se apagó /dev/input/event{self.devices_list[0]}? Desregistrándolo...')
if self.gamepad1 != None:
self.selector.unregister(self.gamepad1)
self.gamepad1 = None
if path[-2:] == '20':
c3(f'¿Se apagó /dev/input/event{self.devices_list[1]}? Desregistrándolo...')
if self.gamepad2 != None:
self.selector.unregister(self.gamepad2)
self.gamepad2 = None
# c4('input_bt() Fin de WHILE')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--gp1', type=int, default=16,help='Gamepad 1')
parser.add_argument('--gp2', type=int, default=20,help='Gamepad 2')
args = parser.parse_args()
bt = Bt()
try:
consola(f'"R": reconnect()', c.naranja)
consola(f'"Q": quit', c.naranja)
thread(bt.input_bt, [args.gp1, args.gp2])
while True:
tecla = input()
if tecla == 'r':
bt.reconnect()
elif tecla == 'q':
sys.exit()
except KeyboardInterrupt:
print(' Bye')
另外,如果你不是我😁,这是加载的模块。
点击展开#!/usr/bin/env python3
import random, threading
from pythonosc import udp_client
targetIp = "127.0.0.1"
targetPort = 10000
client = udp_client.SimpleUDPClient(targetIp, targetPort)
client.send_message("/init", 1)
class Gd:
def __init__(self) -> None:
self.gd = {'verbose': [True, True, True, True]}
globalDict = Gd()
class Color:
def __init__(self):
self.reset = 'x1b[0m'
self.blanco = 'x1b[97m'
self.negro = 'x1b[90m'
self.rojo = 'x1b[91m'
self.verde = 'x1b[92m'
self.azul = 'x1b[94m'
self.amarillo = 'x1b[93m'
self.magenta = 'x1b[95m'
self.magenta_bold = 'x1b[95;1m'
self.azul_bold = 'x1b[94;1m'
self.cian = 'x1b[96m'
self.naranja = 'x1b[38;5;202m'
self.violeta = 'x1b[38;5;129m'
self.rosa = 'x1b[38;5;213m'
self.ocre = 'x1b[38;5;172m'
self.marron = 'x1b[38;5;52m'
self.musgo = 'x1b[38;5;58m'
self.error = 'x1b[93;41m'
self.remoto = 'x1b[93;42m'
self.debug = 'x1b[93;44m'
self.lista_attrs = []
self.attrs = self.__dict__
for k, v in self.attrs.items():
if k not in ['lista_attrs', 'attrs', 'random']:
self.lista_attrs.append(v)
self.random = random.choice(self.lista_attrs)
c = Color()
# Threading
def thread(function, args=[]):
t = threading.Thread(
target=function,
args=(args),
name=f'{function}({args})',
daemon=True)
t.start()
def c1(texto, color_texto=c.azul_bold):
if globalDict.gd['verbose'][0]:
texto = str(texto)
print(color_texto, texto, c.reset)
def c2(texto, color_texto=c.azul):
if globalDict.gd['verbose'][1]:
texto = str(texto)
print(color_texto, texto, c.reset)
def c3(texto, color_texto=c.cian):
if globalDict.gd['verbose'][2]:
texto = str(texto)
print(color_texto, texto, c.reset)
def c4(texto, color_texto=c.rosa):
if globalDict.gd['verbose'][3]:
texto = str(texto)
print(color_texto, texto, c.reset)
def consola(texto, color_texto=c.verde):
texto = str(texto)
print(color_texto, texto, c.reset)