使用树莓派更快地读取多个DS18B20温度传感器



我的自定义传感器仪表板每秒请求新的读数.

这工作得很好,直到我连接了3个DS18B20温度传感器(1线协议,所以都在1引脚上),每个传感器需要750毫秒来提供新的数据。

这是我目前用来读取每个传感器温度的类:

# ds18b20.py
# written by Roger Woollett
import os
import glob
import time

class DS18B20:
# much of this code is lifted from Adafruit web site
# This class can be used to access one or more DS18B20 temperature sensors
# It uses OS supplied drivers and one wire support must be enabled
# To do this add the line
# dtoverlay=w1-gpio
# to the end of /boot/config.txt
#
# The DS18B20 has three pins, looking at the flat side with the pins pointing
# down pin 1 is on the left
# connect pin 1 to GPIO ground
# connect pin 2 to GPIO 4 *and* GPIO 3.3V via a 4k8 (4800 ohm) pullup resistor
# connect pin 3 to GPIO 3.3V
# You can connect more than one sensor to the same set of pins
# Only one pullup resistor is required
def __init__(self):
# Load required kernel modules
os.system('modprobe w1-gpio')
os.system('modprobe w1-therm')
# Find file names for the sensor(s)
base_dir = '/sys/bus/w1/devices/'
device_folder = glob.glob(base_dir + '28*')
self._num_devices = len(device_folder)
self._device_file = list()
i = 0
while i < self._num_devices:
self._device_file.append(device_folder[i] + '/w1_slave')
i += 1

def _read_temp(self, index):
# Issue one read to one sensor
# You should not call this directly
# First check if this index exists
if index >= len(self._device_file):
return False
f = open(self._device_file[index], 'r')
data = f.read()
f.close()
return data

def tempC(self, index=0):
# Call this to get the temperature in degrees C
# detected by a sensor
data = self._read_temp(index)
retries = 0
# Check for error
if data == False:
return None
while (not "YES" in data) and (retries > 0):
# Read failed so try again
time.sleep(0.1)
#print('Read Failed', retries)
data = self._read_temp(index)
retries -= 1
if (retries == 0) and (not "YES" in data):
return None
(discard, sep, reading) = data.partition(' t=')
if reading == 85000:
# 85ºC is the boot temperature of the sensor, so ignore that value
return None
temperature = float(reading) / 1000.0
return temperature

def device_count(self):
# Call this to see how many sensors have been detected
return self._num_devices

我已经尝试返回以前的温度读数,如果当前一个还没有完成,但是这并没有减少读取传感器所需的时间,所以我想唯一的方法是异步地做事情。

我可以降低精度以减少每次读取所需的时间,但理想情况下,我将在单独的线程上同时读取所有传感器。

我如何才能最好地实现这一点?或者有其他方法来提高阅读速度的多个DS18B20传感器?

谢谢你的建议!

您将面临Linux内核驱动程序引入的一些限制。如果您直接与OneWire协议交互,则所有三个传感器只有一个750ms读取周期,而不是(3 * 750ms)。当直接使用1线协议时,您可以发出单个"转换温度"。命令到总线上的所有设备,如这里所述,然后读取所有传感器。

Linux驱动程序明确不支持这种操作模式:

如果没有设备是寄生供电的,则可以同时转换所有设备,然后返回读取单个传感器。目前不支持。当读取值时,驱动程序也不支持降低精度(这也会减少转换时间)。

这意味着每个设备的读取周期为750毫秒。最好的选择可能是将传感器读取代码放在单独的线程中,例如:

import glob
import threading
import time

# Note that we're inheriting from threading.Thread here;
# see https://docs.python.org/3/library/threading.html
# for more information.
class DS18B20(threading.Thread):
default_base_dir = "/sys/bus/w1/devices/"
def __init__(self, base_dir=None):
super().__init__()
self._base_dir = base_dir if base_dir else self.default_base_dir
self.daemon = True
self.discover()
def discover(self):
device_folder = glob.glob(self._base_dir + "28*")
self._num_devices = len(device_folder)
self._device_file: list[str] = []
for i in range(self._num_devices):
self._device_file.append(device_folder[i] + "/w1_slave")
self._values: list[float | None] = [None] * self._num_devices
self._times: list[float] = [0.0] * self._num_devices
def run(self):
"""Thread entrypoint: read sensors in a loop.
Calling DS18B20.start() will cause this method to run in
a separate thread.
"""
while True:
for dev in range(self._num_devices):
self._read_temp(dev)
# Adjust this value as you see fit, noting that you will never
# read actual sensor values more often than 750ms * self._num_devices.
time.sleep(1)
def _read_temp(self, index):
for i in range(3):
with open(self._device_file[index], "r") as f:
data = f.read()
if "YES" not in data:
time.sleep(0.1)
continue
disacard, sep, reading = data.partition(" t=")
temp = float(reading) / 1000.0
self._values[index] = temp
self._times[index] = time.time()
break
else:
print(f"failed to read device {index}")
def tempC(self, index=0):
return self._values[index]
def device_count(self):
"""Return the number of discovered devices"""
return self._num_devices

因为这是一个线程,你需要.start()它第一,所以你的代码看起来像这样:

d = DS18B20()
d.start()
while True:
for i in range(d.device_count()):
print(f'dev {i}: {d.tempC(i)}')
time.sleep(0.5)

您可以随时调用tempC方法,因为它是刚从_values数组返回一个值。实际的更新频率由run方法中的环路控制(和传感器施加的最小周期时间)。

最新更新