Pyserial - Как прочитать последнюю строку, отправленную с последовательного устройства
У меня есть Arduino, подключенный к компьютеру с запуском цикла, посылая значение через последовательный порт обратно на компьютер каждые 100 мск.
Я хочу сделать Python script, который будет считываться из последовательного порта только каждые несколько секунд, поэтому я хочу, чтобы он просто увидел последнее сообщение, отправленное из Arduino.
Как вы это делаете в Pyserial?
Вот код, который я пробовал, который не работает. Он читает строки последовательно.
import serial
import time
ser = serial.Serial('com4',9600,timeout=1)
while 1:
time.sleep(10)
print ser.readline() #How do I get the most recent line sent from the device?
Ответы
Ответ 1
Возможно, я не понимаю ваш вопрос, но поскольку это последовательная строка, вам нужно будет прочитать все, что отправлено из Arduino последовательно - оно будет забуферировано в Arduino, пока вы его не прочитаете.
Если вы хотите, чтобы отображался статус, который показывает последнюю отправленную вещь, используйте поток, который включает код в ваш вопрос (минус сон), и держите последнюю полную строку в качестве последней строки от Arduino.
Обновить: mtasic
пример кода неплохой, но если Arduino отправил частичную строку при вызове inWaiting()
, вы получите усеченную строку. Вместо этого вы должны положить последнюю полную строку в last_received
и сохранить частичную строку в buffer
, чтобы ее можно было добавить к следующему циклу цикла. Что-то вроде этого:
def receiving(ser):
global last_received
buffer_string = ''
while True:
buffer_string = buffer_string + ser.read(ser.inWaiting())
if '\n' in buffer_string:
lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
last_received = lines[-2]
#If the Arduino sends lots of empty lines, you'll lose the
#last filled line, so you could make the above statement conditional
#like so: if lines[-2]: last_received = lines[-2]
buffer_string = lines[-1]
Относительно использования readline()
: Здесь, что должна сказать Pyserial документация (слегка отредактированная для ясности и с упоминанием readlines()):
Будьте внимательны при использовании "readline". Делать укажите время ожидания при открытии последовательный порт, иначе он может блокировать навсегда, если символ новой строки не будет получено. Также обратите внимание, что "readlines()" работает только с таймаутом. Это зависит от наличия тайм-аута и интерпретирует это как EOF (конец файла).
который кажется мне вполне разумным!
Ответ 2
from serial import *
from threading import Thread
last_received = ''
def receiving(ser):
global last_received
buffer = ''
while True:
# last_received = ser.readline()
buffer += ser.read(ser.inWaiting())
if '\n' in buffer:
last_received, buffer = buffer.split('\n')[-2:]
if __name__ == '__main__':
ser = Serial(
port=None,
baudrate=9600,
bytesize=EIGHTBITS,
parity=PARITY_NONE,
stopbits=STOPBITS_ONE,
timeout=0.1,
xonxoff=0,
rtscts=0,
interCharTimeout=None
)
Thread(target=receiving, args=(ser,)).start()
Ответ 3
Эти решения будут обрабатывать процессор при ожидании символов.
Вы должны сделать хотя бы один блокирующий вызов для чтения (1)
while True:
if '\n' in buffer:
pass # skip if a line already in buffer
else:
buffer += ser.read(1) # this will block until one more char or timeout
buffer += ser.read(ser.inWaiting()) # get remaining buffered chars
... и выполняйте разделение, как раньше.
Ответ 4
Этот метод позволяет вам отдельно контролировать тайм-аут для сбора всех данных для каждой строки и другого тайм-аута для ожидания дополнительных строк.
# get the last line from serial port
lines = serial_com()
lines[-1]
def serial_com():
'''Serial communications: get a response'''
# open serial port
try:
serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
except serial.SerialException as e:
print("could not open serial port '{}': {}".format(com_port, e))
# read response from serial port
lines = []
while True:
line = serial_port.readline()
lines.append(line.decode('utf-8').rstrip())
# wait for new data after each line
timeout = time.time() + 0.1
while not serial_port.inWaiting() and timeout > time.time():
pass
if not serial_port.inWaiting():
break
#close the serial port
serial_port.close()
return lines
Ответ 5
Вам понадобится цикл, чтобы прочитать все отправленное с последним вызовом readline(), блокирующим до таймаута. Итак:
def readLastLine(ser):
last_data=''
while True:
data=ser.readline()
if data!='':
last_data=data
else:
return last_data
Ответ 6
Небольшая модификация mtasic и кода Vinay Sajip:
Пока я нашел, что этот код мне очень полезен для подобного приложения, мне нужны все строки, возвращаемые с последовательного устройства, которое периодически отправляло информацию.
Я решил выпустить первый элемент сверху, записать его, а затем снова присоединиться к остальным элементам в качестве нового буфера и продолжить оттуда.
Я понимаю, что это не то, что просил Грег, но я подумал, что стоит поделиться как записку.
def receiving(ser):
global last_received
buffer = ''
while True:
buffer = buffer + ser.read(ser.inWaiting())
if '\n' in buffer:
lines = buffer.split('\n')
last_received = lines.pop(0)
buffer = '\n'.join(lines)
Ответ 7
Использование .inWaiting()
внутри бесконечного цикла может быть проблематичным. Это может повредить весь CPU в зависимости от реализации. Вместо этого я бы рекомендовал использовать определенный размер данных для чтения. Поэтому в этом случае следует сделать следующее:
ser.read(1024)
Ответ 8
Слишком много осложнений
В чем причина разделения объекта байтов на новую строку или на другие манипуляции с массивами?
Я пишу простейший метод, который поможет решить вашу проблему:
import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
last = ''
for byte in s.read(s.inWaiting()): last += chr(byte)
if len(last) > 0:
# Do whatever you want with last
print (bytes(last, "utf-8"))
last = ''
Ответ 9
Вы можете использовать ser.flushInput()
для очистки всех последовательных данных, которые в настоящее время находятся в буфере.
После очистки старых данных пользователь ser.readline() может получить последние данные с последовательного устройства.
Я думаю, что это немного проще, чем другие предлагаемые решения здесь. Работал для меня, надеюсь, он вам подходит.