ESPマイコンXIAO ESP-S3をBLEで制御しました。
先日の入門記事の続きです。
s51517765.hatenadiary.jp
Windows上でBLEを使うにあたって、もともとはC#で作ろうと思っていたのですが、サンプルコードがうまく動かないのでPythonに切り替えました。
以下のサンプルコードを参考に基本機能を作り、入力の受付をスレッド化することでさらに高速に動くようにしました。
atatat.hatenablog.com
C言語コード(XIAO側)
XIAO ESP側のコードは、先日の記事の参考記事を少し修正したものです。
zenn.dev
変更点は、データを受け取った時の動き部分とLEDのピンなどです。
データを受け取ったときは「1」または「0」のみ受け付けるようにERR処理を追加しました。
#include <BLEDevice.h> #include <BLEServer.h> #include <BLEUtils.h> #include <BLE2902.h> #include <Arduino.h> //--------------------------------------------------------- // Constants //--------------------------------------------------------- #define SERVICE_UUID "55725ac1-066c-48b5-8700-2d9fb3603c5e" #define CHARACTERISTIC_UUID "69ddb59c-d601-4ea4-ba83-44f679a670ba" #define BLE_DEVICE_NAME "XIAO_S3" #define LED_PIN 7 #define BUTTON_PIN 4 //--------------------------------------------------------- // Variables //--------------------------------------------------------- BLEServer *pServer = NULL; BLECharacteristic *pCharacteristic = NULL; bool deviceConnected = false; bool oldDeviceConnected = false; std::string rxValue; std::string txValue; bool bleOn = false; bool buttonPressed = false; int buttonCount = 0; //--------------------------------------------------------- // Callbacks //--------------------------------------------------------- class MyServerCallbacks: public BLEServerCallbacks { void onConnect(BLEServer *pServer) { deviceConnected = true; Serial.println("onConnect"); }; void onDisconnect(BLEServer *pServer) { deviceConnected = false; Serial.println("onDisconnect"); } }; class MyCharacteristicCallbacks: public BLECharacteristicCallbacks { void onWrite(BLECharacteristic *pCharacteristic) { Serial.println("onWrite"); std::string rxValue = pCharacteristic->getValue(); if( rxValue.length() > 0 ){ if (rxValue[0]==0x31) // 1 bleOn = 1; else if (rxValue[0]==0x30) // 0 bleOn = 0; Serial.print("Received Value: "); for(int i=0; i<rxValue.length(); i++ ){ Serial.print(rxValue[i],HEX); } Serial.println(); } } }; //--------------------------------------------------------- void setup() { // pinMode(LED_PIN, OUTPUT); pinMode(BUTTON_PIN, INPUT_PULLUP); Serial.begin(115200); // BLEDevice::init(BLE_DEVICE_NAME); // Server pServer = BLEDevice::createServer(); pServer->setCallbacks(new MyServerCallbacks()); // Service BLEService *pService = pServer->createService(SERVICE_UUID); // Characteristic pCharacteristic = pService->createCharacteristic( CHARACTERISTIC_UUID, BLECharacteristic::PROPERTY_WRITE | BLECharacteristic::PROPERTY_NOTIFY ); pCharacteristic->setCallbacks(new MyCharacteristicCallbacks()); pCharacteristic->addDescriptor(new BLE2902()); // pService->start(); // Advertising BLEAdvertising *pAdvertising = BLEDevice::getAdvertising(); pAdvertising->addServiceUUID(SERVICE_UUID); pAdvertising->setScanResponse(false); pAdvertising->setMinPreferred(0x0); BLEDevice::startAdvertising(); Serial.println("startAdvertising"); } //--------------------------------------------------------- void loop() { // disconnecting if(!deviceConnected && oldDeviceConnected){ delay(500); // give the bluetooth stack the chance to get things ready pServer->startAdvertising(); Serial.println("restartAdvertising"); oldDeviceConnected = deviceConnected; } // connecting if(deviceConnected && !oldDeviceConnected){ oldDeviceConnected = deviceConnected; } // LED digitalWrite(LED_PIN,bleOn?HIGH:LOW); // BUTTON if(digitalRead(BUTTON_PIN) == LOW){ if(buttonPressed == false){ buttonCount++; String str = "BTN:"+String(buttonCount); Serial.println(str); // Notify if( deviceConnected ){ txValue = str.c_str(); pCharacteristic->setValue(txValue); pCharacteristic->notify(); } buttonPressed = true; delay(50); } }else{ if(buttonPressed == true){ buttonPressed = false; delay(50); } } }
Pythonコード(Windows側)
まず、最初に作ったのが以下のような形です。
XIAO側で設定したUUIDCHARACTERISTIC_UUIDとデバイス名d.nameでデバイスを特定してペアリングします。
import asyncio from bleak import discover # pip install bleak from bleak import BleakClient address = "" CHARACTERISTIC_UUID = "69ddb59c-d601-4ea4-ba83-44f679a670ba" async def discover_device(): global address devices = await discover() for d in devices: print(d) if d.name == "XIAO_S3": # 接続したいデバイスの名前で検索 address = d.address async def run(address, loop): async with BleakClient(address, loop=loop) as client: isConnected = await client.is_connected() # 接続OK? print("Connected: {0}".format(isConnected)) if isConnected: y = await client.read_gatt_char(CHARACTERISTIC_UUID) print("Read=", y) # data =b'01' y = await client.write_gatt_char(CHARACTERISTIC_UUID, data) print("Ret=", y) if __name__ == "__main__": global data loop = asyncio.get_event_loop() loop.run_until_complete(discover_device()) while True: data = input("data= ").encode() loop.run_until_complete(run(address, loop)) ###### ここが、送受信が終わるまで待つので遅い ######
これでも、通信はできますが、入力がスレッド化されていないためBLEの送受信が終わってから次の入力を受け付けるようになります。
これを以下のように変更しました。
import asyncio from bleak import discover # pip install bleak from bleak import BleakClient CHARACTERISTIC_UUID = "69ddb59c-d601-4ea4-ba83-44f679a670ba" address = "" async def discover_device(): global address devices = await discover() # デバイス探索は非同期で行う for d in devices: print(d) if d.name == "XIAO_S3": address = d.address print(f"Device found: {d.name} at {address}") return address print("device not find.") return None async def send_data(client, data): # デバイスに接続されているか確認 if client.is_connected: print(f"Connection status: {client.is_connected}") y = await client.read_gatt_char(CHARACTERISTIC_UUID) print("Read:", y) # データをUUIDに書き込む response = await client.write_gatt_char(CHARACTERISTIC_UUID, data) print("Write:", response) else: print("Disconnected...") async def handle_input(client): while True: data = input("Input send data:").encode() # 非同期でデータ送信を行う await send_data(client, data) async def main(): # デバイスの探索と接続を非同期で行う address = await discover_device() if not address: return # まず接続を確立 async with BleakClient(address) as client: # ユーザー入力を受け付ける非同期タスクを開始 await handle_input(client) if __name__ == "__main__": # メインの処理を非同期で実行 asyncio.run(main())
async def handle_input(client):の中で、入力受付とデータ送信が行われますが、これらが非同期で実行さる、つまり互いにバックグラウンドで実行するようになるので入力に戻ってくるのが早くなります。
まとめ
XIAO ESP-S3とWindowsでBLE無線通信を試してみました。
このコードはXIAOに限らず、またWindowsに限らず様々な組み合わせでBLE無線通信を行うことができるはずです。
非同期処理はなんかよくわからない部分もありますが、動いたのでヨシです。
www.youtube.com