Ich habe die Micropython-Codebeispiele bei Github ausgeführt und die Bluetooth-Beispielprogramme „ble_simple_peripheral.py“ über eine Bluetooth-Verbindung mit „ble_simple_central.py“ verbunden. Das Ausführen der beiden Programme funktioniert einwandfrei, aber wenn „ble_simple_peripheral.py“ Daten empfängt, kann es diese korrekt drucken, während „ble_simple_central.py“ auf auf den Empfang einer Benachrichtigung vom Peripheriegerät reagiert. Als Hardware wurden zwei „Raspberry Pi Pico W“-Boards verwendet.
Hat jemand eine Idee, warum die „Notify Read“-Operation keine korrekten Daten liefert?
(Ich führe den Code in zwei Thonny-Fenstern aus. Die Demoprogramme erfordern, dass „ble_advertising.py“ auf beide Boards heruntergeladen wird.)
Der Code für „ble_simple_peritheral.py“ und „ble_simple_central.py“ von Github ist unten aufgeführt. Die entscheidenden Codeteile finden Sie in der Klassendefinition und am Ende des Listings in ble_simple_central.py-„demo“, wo die „on_notify“-Behandlung eingerichtet ist:
#Short cutout from ble_simple_central.py:
# Set handler for when data is received over the UART.
def on_notify(self, callback):
self._notify_callback = callback
....
def on_rx(v):
print("RX", v)
central.on_notify(on_rx)
# "ble_simple_central.py". This program prints received data as: "RX:
import bluetooth
import random
import struct
import time
import micropython
from ble_advertising import decode_services, decode_name
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")
class BLESimpleCentral:
def __init__(self, ble):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._reset()
def _reset(self):
# Cached name and address from a successful scan.
self._name = None
self._addr_type = None
self._addr = None
# Callbacks for completion of various operations.
# These reset back to None after being invoked.
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Persistent callback for when new data is notified from the device.
self._notify_callback = None
# Connected device.
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._tx_handle = None
self._rx_handle = None
def _irq(self, event, data):
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services(
adv_data
):
# Found a potential device, remember it and stop scanning.
self._addr_type = addr_type
self._addr = bytes(
addr
) # Note: addr buffer is owned by caller so need to copy it.
self._name = decode_name(adv_data) or "?"
self._ble.gap_scan(None)
elif event == _IRQ_SCAN_DONE:
if self._scan_callback:
if self._addr:
# Found a device during the scan (and the scan was explicitly stopped).
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
else:
# Scan timed out.
self._scan_callback(None, None, None)
elif event == _IRQ_PERIPHERAL_CONNECT:
# Connect successful.
conn_handle, addr_type, addr = data
if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle
self._ble.gattc_discover_services(self._conn_handle)
elif event == _IRQ_PERIPHERAL_DISCONNECT:
# Disconnect (either initiated by us or the remote end).
conn_handle, _, _ = data
if conn_handle == self._conn_handle:
# If it was initiated by us, it'll already be reset.
self._reset()
elif event == _IRQ_GATTC_SERVICE_RESULT:
# Connected device returned a service.
conn_handle, start_handle, end_handle, uuid = data
print("service", data)
if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
self._start_handle, self._end_handle = start_handle, end_handle
elif event == _IRQ_GATTC_SERVICE_DONE:
# Service query complete.
if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle
)
else:
print("Failed to find uart service.")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# Connected device returned a characteristic.
conn_handle, def_handle, value_handle, properties, uuid = data
if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
self._rx_handle = value_handle
if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
self._tx_handle = value_handle
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
# Characteristic query complete.
if self._tx_handle is not None and self._rx_handle is not None:
# We've finished connecting and discovering device, fire the connect callback.
if self._conn_callback:
self._conn_callback()
else:
print("Failed to find uart rx characteristic.")
elif event == _IRQ_GATTC_WRITE_DONE:
conn_handle, value_handle, status = data
print("TX complete")
elif event == _IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
if conn_handle == self._conn_handle and value_handle == self._tx_handle:
if self._notify_callback:
self._notify_callback(notify_data)
# Returns true if we've successfully connected and discovered characteristics.
def is_connected(self):
return (
self._conn_handle is not None
and self._tx_handle is not None
and self._rx_handle is not None
)
# Find a device advertising the environmental sensor service.
def scan(self, callback=None):
self._addr_type = None
self._addr = None
self._scan_callback = callback
self._ble.gap_scan(2000, 30000, 30000)
# Connect to the specified device (otherwise use cached address from a scan).
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Disconnect from current device.
def disconnect(self):
if self._conn_handle is None:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Send data over the UART
def write(self, v, response=False):
if not self.is_connected():
return
self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)
# Set handler for when data is received over the UART.
def on_notify(self, callback):
self._notify_callback = callback
def demo():
ble = bluetooth.BLE()
central = BLESimpleCentral(ble)
not_found = False
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("Found peripheral:", addr_type, addr, name)
central.connect()
else:
nonlocal not_found
not_found = True
print("No peripheral found.")
central.scan(callback=on_scan)
# Wait for connection...
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
def on_rx(v):
print("RX", v)
central.on_notify(on_rx)
with_response = False
i = 0
while central.is_connected():
try:
v = str(i) + "_"
print("TX", v)
central.write(v, with_response)
except:
print("TX failed")
i += 1
time.sleep_ms(400 if with_response else 30)
print("Disconnected")
if __name__ == "__main__":
demo()
Ich habe die Micropython-Codebeispiele bei Github ausgeführt und die Bluetooth-Beispielprogramme „ble_simple_peripheral.py“ über eine Bluetooth-Verbindung mit „ble_simple_central.py“ verbunden. Das Ausführen der beiden Programme funktioniert einwandfrei, aber wenn „ble_simple_peripheral.py“ Daten empfängt, kann es diese korrekt drucken, während „ble_simple_central.py“ auf auf den Empfang einer Benachrichtigung vom Peripheriegerät reagiert. Als Hardware wurden zwei „Raspberry Pi Pico W“-Boards verwendet. Hat jemand eine Idee, warum die „Notify Read“-Operation keine korrekten Daten liefert? (Ich führe den Code in zwei Thonny-Fenstern aus. Die Demoprogramme erfordern, dass „ble_advertising.py“ auf beide Boards heruntergeladen wird.) Der Code für „ble_simple_peritheral.py“ und „ble_simple_central.py“ von Github ist unten aufgeführt. Die entscheidenden Codeteile finden Sie in der Klassendefinition und am Ende des Listings in ble_simple_central.py-„demo“, wo die „on_notify“-Behandlung eingerichtet ist: [code]# "ble_simple_peripheral.py". This program output received data correctly! import bluetooth import random import struct import time from ble_advertising import advertising_payload
def _irq(self, event, data): # Track connections so we can send notifications. if event == _IRQ_CENTRAL_CONNECT: conn_handle, _, _ = data print("New connection", conn_handle) self._connections.add(conn_handle) elif event == _IRQ_CENTRAL_DISCONNECT: conn_handle, _, _ = data print("Disconnected", conn_handle) self._connections.remove(conn_handle) # Start advertising again to allow a new connection. self._advertise() elif event == _IRQ_GATTS_WRITE: conn_handle, value_handle = data value = self._ble.gatts_read(value_handle) if value_handle == self._handle_rx and self._write_callback: self._write_callback(value)
def send(self, data): for conn_handle in self._connections: self._ble.gatts_notify(conn_handle, self._handle_tx, data)
def demo(): ble = bluetooth.BLE() p = BLESimplePeripheral(ble)
def on_rx(v): print("RX", v)
p.on_write(on_rx)
i = 0 while True: if p.is_connected(): # Short burst of queued notifications. for _ in range(3): data = str(i) + "_" print("TX", data) p.send(data) i += 1 time.sleep_ms(100)
if __name__ == "__main__": demo() [/code] [code]#Short cutout from ble_simple_central.py: # Set handler for when data is received over the UART. def on_notify(self, callback): self._notify_callback = callback ....
def on_rx(v): print("RX", v)
central.on_notify(on_rx) [/code] [code]# "ble_simple_central.py". This program prints received data as: "RX: import bluetooth import random import struct import time import micropython
from ble_advertising import decode_services, decode_name
class BLESimpleCentral: def __init__(self, ble): self._ble = ble self._ble.active(True) self._ble.irq(self._irq)
self._reset()
def _reset(self): # Cached name and address from a successful scan. self._name = None self._addr_type = None self._addr = None
# Callbacks for completion of various operations. # These reset back to None after being invoked. self._scan_callback = None self._conn_callback = None self._read_callback = None
# Persistent callback for when new data is notified from the device. self._notify_callback = None
def _irq(self, event, data): if event == _IRQ_SCAN_RESULT: addr_type, addr, adv_type, rssi, adv_data = data if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services( adv_data ): # Found a potential device, remember it and stop scanning. self._addr_type = addr_type self._addr = bytes( addr ) # Note: addr buffer is owned by caller so need to copy it. self._name = decode_name(adv_data) or "?" self._ble.gap_scan(None)
elif event == _IRQ_SCAN_DONE: if self._scan_callback: if self._addr: # Found a device during the scan (and the scan was explicitly stopped). self._scan_callback(self._addr_type, self._addr, self._name) self._scan_callback = None else: # Scan timed out. self._scan_callback(None, None, None)
elif event == _IRQ_PERIPHERAL_CONNECT: # Connect successful. conn_handle, addr_type, addr = data if addr_type == self._addr_type and addr == self._addr: self._conn_handle = conn_handle self._ble.gattc_discover_services(self._conn_handle)
elif event == _IRQ_PERIPHERAL_DISCONNECT: # Disconnect (either initiated by us or the remote end). conn_handle, _, _ = data if conn_handle == self._conn_handle: # If it was initiated by us, it'll already be reset. self._reset()
elif event == _IRQ_GATTC_SERVICE_RESULT: # Connected device returned a service. conn_handle, start_handle, end_handle, uuid = data print("service", data) if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID: self._start_handle, self._end_handle = start_handle, end_handle
elif event == _IRQ_GATTC_SERVICE_DONE: # Service query complete. if self._start_handle and self._end_handle: self._ble.gattc_discover_characteristics( self._conn_handle, self._start_handle, self._end_handle ) else: print("Failed to find uart service.")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: # Connected device returned a characteristic. conn_handle, def_handle, value_handle, properties, uuid = data if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID: self._rx_handle = value_handle if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID: self._tx_handle = value_handle
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: # Characteristic query complete. if self._tx_handle is not None and self._rx_handle is not None: # We've finished connecting and discovering device, fire the connect callback. if self._conn_callback: self._conn_callback() else: print("Failed to find uart rx characteristic.")
elif event == _IRQ_GATTC_WRITE_DONE: conn_handle, value_handle, status = data print("TX complete")
elif event == _IRQ_GATTC_NOTIFY: conn_handle, value_handle, notify_data = data if conn_handle == self._conn_handle and value_handle == self._tx_handle: if self._notify_callback: self._notify_callback(notify_data)
# Returns true if we've successfully connected and discovered characteristics. def is_connected(self): return ( self._conn_handle is not None and self._tx_handle is not None and self._rx_handle is not None )
# Connect to the specified device (otherwise use cached address from a scan). def connect(self, addr_type=None, addr=None, callback=None): self._addr_type = addr_type or self._addr_type self._addr = addr or self._addr self._conn_callback = callback if self._addr_type is None or self._addr is None: return False self._ble.gap_connect(self._addr_type, self._addr) return True
# Disconnect from current device. def disconnect(self): if self._conn_handle is None: return self._ble.gap_disconnect(self._conn_handle) self._reset()
# Send data over the UART def write(self, v, response=False): if not self.is_connected(): return self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)
# Set handler for when data is received over the UART. def on_notify(self, callback): self._notify_callback = callback
def demo(): ble = bluetooth.BLE() central = BLESimpleCentral(ble)
not_found = False
def on_scan(addr_type, addr, name): if addr_type is not None: print("Found peripheral:", addr_type, addr, name) central.connect() else: nonlocal not_found not_found = True print("No peripheral found.")
central.scan(callback=on_scan)
# Wait for connection... while not central.is_connected(): time.sleep_ms(100) if not_found: return
print("Connected")
def on_rx(v): print("RX", v)
central.on_notify(on_rx)
with_response = False
i = 0 while central.is_connected(): try: v = str(i) + "_" print("TX", v) central.write(v, with_response) except: print("TX failed") i += 1 time.sleep_ms(400 if with_response else 30)
Ich habe die Micropython-Codebeispiele bei Github ausgeführt und die Bluetooth-Beispielprogramme „ble_simple_peripheral.py“ über eine Bluetooth-Verbindung mit „ble_simple_central.py“ verbunden. Das...
Ich verwende cordova-plugin-ble-central und versuche, eine Verbindung zu meinem Mi Smart Band 4 (auch 5 und 6) herzustellen, und alle Verbindungen sind erfolgreich (einschließlich Kopplung). Mein...
Ich migriere auf das neue central-publishing-maven-plugin (v0.9.0) für Bereitstellungen in Maven Central. Mein Projekt verwendet eine Standardstruktur mit mehreren Modulen und mein Ziel ist es, nur...
Hallo, ich habe Probleme mit einem Code, den ich verwenden möchte. Ich habe das Problem auf einen Abschnitt eingegrenzt und bin mir nicht sicher, wie ich es beheben kann. Der Code schlägt beim...