所以我有一个代码,---从Arduino读取串行输入,在按下特定键时将串行数据写入Arduino,我还想添加一个功能将串行数据写入文本文件。我使用线程来完成这些操作。但是我有一个大问题,因为我无法向文件写入任何内容。这是我正在使用的代码:
from pynput import keyboard
import threading
import serial
import sys
import io
ser = None
class SerialReaderThread(threading.Thread):
def run(self):
global ser
ser = serial.Serial('COM4', baudrate = 9600, timeout = 5)
while True:
print(ser.readline().decode('utf-8'))
class FileWriting(threading.Thread):
def run(self):
while True:
with io.open("output.txt", "a", encoding="utf-8") as f:
f.write(ser.readline().decode('utf-8'))
class KeyboardThread(threading.Thread):
def run(self):
def on_press(key):
try:
format(key.char)
if key.char == "1":
ser.write(b'1rn') #serial write - 1
elif key.char == "2":
ser.write(b'2rn') #serial write - 2
elif key.char == "3":
ser.write(b'3rn') #serial write - 3
elif key.char == "4":
ser.write(b'4rn') #serial write - 4
elif key.char == "5":
ser.write(b'5rn') #serial write - 5
elif key.char == "6":
ser.write(b'6rn') #serial write - 6
elif key.char == "0":
ser.write(b'0rn') #serial write - 0
except AttributeError:
format(key)
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
listener = keyboard.Listener(on_press=on_press)
listener.start()
serial_thread = SerialReaderThread()
keyboard_thread = KeyboardThread()
file_thread = FileWriting()
serial_thread.start()
keyboard_thread.start()
file_thread.start()
serial_thread.join()
keyboard_thread.join()
file_thread.join()
当我运行代码时,读取以及将串行数据写入Arduino都运行良好,但是正如我之前所说,将数据写入文本文件不会,它给了我这个错误:
Exception in thread Thread-3:
Traceback (most recent call last):
File "C:UsersTsotneAppDataLocalProgramsPythonPython38-32libthreading.py", line 932, in _bootstrap_inner
self.run()
File "C:UsersTsotneAppDataLocalProgramsPythonPython38-32projectmenucode.py", line 44, in run
f.write(ser.readline().decode('utf-8'))
AttributeError: 'NoneType' object has no attribute 'readline'
如果有人知道解决方案,请感谢任何帮助。
提前致谢
您不能有 2 个串行实例指向同一端口,并且不能同时读取和写入......您需要执行以下操作
import threading
import serial,time
class MySerialWorker(object):
pending_commands = [] # just commands
pending_queries = [] # commands awaiting responses
def __init__(self,*args,**kwargs):
# you might actually need to create this in the thread spawned later
self.ser = serial.Serial(*args,**kwargs)
def send_command(self,cmd):
self.pending_commands.append(cmd)
def query_command(self,cmd,callback):
self.pending_queries.append([cmd,callback])
def _do_query(self,cmd):
self.ser.write(cmd)
time.sleep(0.01) # give it a second
return self.ser.readline() # since you are expecting a newline at end of response
def main_loop(self):
while self.ser.isOpen():
while self.pending_queries: # handle these first
cmd,callback = self.pending_queries.pop(0)
callback(cmd,self._do_query(cmd))
if self.ser.inWaiting(): # check for any pending incoming messages
message_from_arduino = self.ser.readline()
print("Arduino says:",message_from_arduino)
while self.pending_commands: # send any commands we want
# write any pending commands
self.ser.write(self.pending_commands.pop(0))
time.sleep(0.01) # sleep a tick
def run_threaded(self):
self.thread = threading.Thread(self.main_loop)
self.thread.start()
def terminate(self):
self.ser.close() # closing the port will make the loop exit
self.thread.join() # wait for thread to exit
class MainProgram():
def __init__(self):
self.ser = MySerialWorker("COM5",9600,timeout=1)
self.pending_command = False
def on_result(self,cmd,result):
print("Device responded to %r with %r"%(cmd,result))
self.pending_command = False
def main_loop(self):
self.ser.run_threaded()
while True:
if not self.pending_command:
# await a command
cmd = input("SENDQUERY>")
self.pending_command = True
self.ser.query_command(cmd=cmd+"n",callback=self.on_result)
time.sleep(0.01) # sleep a tick
MainProgram().main_loop()