Pyserial-asyncio Client/Server in Python 3.8 kommunizieren nicht sofortPython

Python-Programme
Anonymous
 Pyserial-asyncio Client/Server in Python 3.8 kommunizieren nicht sofort

Post by Anonymous »

Ich lerne Python und Asyncio und nachdem ich mit Asyncio für einen TCP-Client/Server erfolgreich war, habe ich meinen ersten Stab mit dem Erstellen eines seriellen Clients/Servers mit pyserial-asyncio-Ausführungen in Bash auf einem Raspberry Pi 5 mit Python 3.8 (ich kann die Version nicht ändern).

Code: Select all

import asyncio
import serial_asyncio

class UARTProtocol(asyncio.Protocol):
def __init__(self):
self.transport = None

def connection_made(self, transport):
self.transport = transport
print('Port opened', transport)

def data_received(self, data):
print('Data received:', data.decode())
# Echo received data back (example)
self.transport.write(data)
# Close the connection if 'exit' is received
if data == b"exit\r":
self.transport.close()

def connection_lost(self, exc):
print('Port closed')
self.transport = None

def pause_writing(self):
print('pause writing')
print(self.transport.get_write_buffer_size())

def resume_writing(self):
print(self.transport.get_write_buffer_size())
print('resume writing')

async def run_uart_server():
loop = asyncio.get_running_loop()
try:
transport, protocol = await serial_asyncio.create_serial_connection(loop, UARTProtocol, '/dev/ttyAMA2', baudrate=9600)
print("UART server started.")
await asyncio.Future()  # Run forever
except serial.serialutil.SerialException as e:
print(f"Error: Could not open serial port: {e}")
finally:
if transport:
transport.close()

if __name__ == "__main__":
asyncio.run(run_uart_server())
< /code>
und Client: < /p>
import asyncio
import serial_asyncio

async def uart_client(port, baudrate):
try:
reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=baudrate)
print(f"Connected to {port} at {baudrate} bps")

async def receive_data():
while True:
try:
data = await reader.readline()
if data:
print(f"Received: {data.decode().strip()}")
except Exception as e:
print(f"Error reading data: {e}")
break

async def send_data():
while True:
message = input("Enter message to send (or 'exit' to quit): ")
if message.lower() == 'exit':
break
writer.write((message + '\n').encode())
# writer.write_eof()
await writer.drain()
print(f"Sent: {message}")

await asyncio.gather(receive_data(), send_data())

except serial.SerialException as e:
print(f"Error opening serial port: {e}")
finally:
if 'writer' in locals():
writer.close()
await writer.wait_closed()
print("Connection closed.")

if __name__ == "__main__":
asyncio.run(uart_client('/dev/ttyAMA1', 9600))
< /code>
Ich möchte, dass der Client mich für einen Text auffordert, der sofort an den Server gesendet und dort gedruckt wird. Ich kann den Client dazu bringen, mich für Text aufzufordern, aber der Server zeigt erst nach dem Eingeben von Exit 
im Client die Verbindung, um die Verbindung zu schließen, und druckt dann den gesamten Text, den ich in der Client -Loop eingetragen habe. Aber dann fordert der Client mich nie wieder auf Eingaben auf.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post