プログラミング素人のはてなブログ

プログラミングも電気回路も専門外の技術屋の末端が勉強したことや作品をアウトプットするブログ。コードに間違いなど見つけられたら、気軽にコメントください。 C#、Python3、ラズパイなど。

XIAO ESP-S3とWindows PCでBLE通信をする

ESPマイコンXIAO ESP-S3をBLEで制御しました。

先日の入門記事の続きです。
s51517765.hatenadiary.jp

Windows上でBLEを使うにあたって、もともとはC#で作ろうと思っていたのですが、サンプルコードがうまく動かないのでPythonに切り替えました。
以下のサンプルコードを参考に基本機能を作り、入力の受付をスレッド化することでさらに高速に動くようにしました。
atatat.hatenablog.com

C言語コード(XIAO側)

XIAO ESP側のコードは、先日の記事の参考記事を少し修正したものです。
zenn.dev

変更点は、データを受け取った時の動き部分とLEDのピンなどです。
データを受け取ったときは「1」または「0」のみ受け付けるようにERR処理を追加しました。

#include <BLEDevice.h>
#include <BLEServer.h>
#include <BLEUtils.h>
#include <BLE2902.h>
#include <Arduino.h>

//---------------------------------------------------------
// Constants
//---------------------------------------------------------
#define SERVICE_UUID        "55725ac1-066c-48b5-8700-2d9fb3603c5e"
#define CHARACTERISTIC_UUID "69ddb59c-d601-4ea4-ba83-44f679a670ba"
#define BLE_DEVICE_NAME     "XIAO_S3"
#define LED_PIN             7
#define BUTTON_PIN          4

//---------------------------------------------------------
// Variables
//---------------------------------------------------------
BLEServer *pServer = NULL;
BLECharacteristic *pCharacteristic = NULL;
bool deviceConnected = false;
bool oldDeviceConnected = false;
std::string rxValue;
std::string txValue;
bool bleOn = false;
bool buttonPressed = false;
int buttonCount = 0;
//---------------------------------------------------------
// Callbacks
//---------------------------------------------------------
class MyServerCallbacks: public BLEServerCallbacks {
  void onConnect(BLEServer *pServer) {
    deviceConnected = true;
    Serial.println("onConnect");
  };
  void onDisconnect(BLEServer *pServer) {
    deviceConnected = false;
    Serial.println("onDisconnect");
  }
};

class MyCharacteristicCallbacks: public BLECharacteristicCallbacks {
  void onWrite(BLECharacteristic *pCharacteristic) {
    Serial.println("onWrite");
    std::string rxValue = pCharacteristic->getValue();
    if( rxValue.length() > 0 ){
      if (rxValue[0]==0x31) // 1
        bleOn = 1;
      else if (rxValue[0]==0x30) // 0
        bleOn = 0;
      Serial.print("Received Value: ");
      for(int i=0; i<rxValue.length(); i++ ){
        Serial.print(rxValue[i],HEX);
      }
      Serial.println();
    }
  }
};

//---------------------------------------------------------
void setup() {
  //
  pinMode(LED_PIN, OUTPUT);
  pinMode(BUTTON_PIN, INPUT_PULLUP);
  Serial.begin(115200);
  //
  BLEDevice::init(BLE_DEVICE_NAME);
  // Server
  pServer = BLEDevice::createServer();
  pServer->setCallbacks(new MyServerCallbacks());
  // Service
  BLEService *pService = pServer->createService(SERVICE_UUID);
  // Characteristic
  pCharacteristic = pService->createCharacteristic(
    CHARACTERISTIC_UUID,
    BLECharacteristic::PROPERTY_WRITE  |
    BLECharacteristic::PROPERTY_NOTIFY
  );
  pCharacteristic->setCallbacks(new MyCharacteristicCallbacks());
  pCharacteristic->addDescriptor(new BLE2902());
  //
  pService->start();
  // Advertising
  BLEAdvertising *pAdvertising = BLEDevice::getAdvertising();
  pAdvertising->addServiceUUID(SERVICE_UUID);
  pAdvertising->setScanResponse(false);
  pAdvertising->setMinPreferred(0x0);
  BLEDevice::startAdvertising();
  Serial.println("startAdvertising");
}

//---------------------------------------------------------
void loop() {
  // disconnecting
  if(!deviceConnected && oldDeviceConnected){
    delay(500); // give the bluetooth stack the chance to get things ready
    pServer->startAdvertising();
    Serial.println("restartAdvertising");
    oldDeviceConnected = deviceConnected;
  }
  // connecting
  if(deviceConnected && !oldDeviceConnected){
    oldDeviceConnected = deviceConnected;
  }
  // LED
  digitalWrite(LED_PIN,bleOn?HIGH:LOW);
  // BUTTON
  if(digitalRead(BUTTON_PIN) == LOW){
    if(buttonPressed == false){
      buttonCount++;
      String str = "BTN:"+String(buttonCount);
      Serial.println(str);
      // Notify
      if( deviceConnected ){
        txValue = str.c_str();
        pCharacteristic->setValue(txValue);
        pCharacteristic->notify();
      }
      buttonPressed = true;
      delay(50);
    }
  }else{
    if(buttonPressed == true){
      buttonPressed = false;
      delay(50);
    }
  }
}

Pythonコード(Windows側)

まず、最初に作ったのが以下のような形です。
XIAO側で設定したUUIDCHARACTERISTIC_UUIDとデバイスd.nameでデバイスを特定してペアリングします。

import asyncio
from bleak import discover  # pip install bleak
from bleak import BleakClient

address = ""
CHARACTERISTIC_UUID = "69ddb59c-d601-4ea4-ba83-44f679a670ba"


async def discover_device():
    global address
    devices = await discover()
    for d in devices:
        print(d)
        if d.name == "XIAO_S3": # 接続したいデバイスの名前で検索
            address = d.address


async def run(address, loop):
    async with BleakClient(address, loop=loop) as client:
        isConnected = await client.is_connected()  # 接続OK?
        print("Connected: {0}".format(isConnected))
        if isConnected:
            y = await client.read_gatt_char(CHARACTERISTIC_UUID)
            print("Read=", y)
            # data =b'01'
            y = await client.write_gatt_char(CHARACTERISTIC_UUID, data)
            print("Ret=", y)


if __name__ == "__main__":
    global data
    loop = asyncio.get_event_loop()
    loop.run_until_complete(discover_device())
    while True:
        data = input("data= ").encode()
        loop.run_until_complete(run(address, loop))  ###### ここが、送受信が終わるまで待つので遅い ######

これでも、通信はできますが、入力がスレッド化されていないためBLEの送受信が終わってから次の入力を受け付けるようになります。
これを以下のように変更しました。

import asyncio
from bleak import discover  # pip install bleak
from bleak import BleakClient

CHARACTERISTIC_UUID = "69ddb59c-d601-4ea4-ba83-44f679a670ba"
address = ""

async def discover_device():
    global address
    devices = await discover()  # デバイス探索は非同期で行う
    for d in devices:
        print(d)
        if d.name == "XIAO_S3":
            address = d.address
            print(f"Device found: {d.name} at {address}")
            return address
    print("device not find.")
    return None

async def send_data(client, data):
    # デバイスに接続されているか確認
    if client.is_connected:
        print(f"Connection status: {client.is_connected}")
        y = await client.read_gatt_char(CHARACTERISTIC_UUID)
        print("Read:", y)
        # データをUUIDに書き込む
        response = await client.write_gatt_char(CHARACTERISTIC_UUID, data)
        print("Write:", response)
    else:
        print("Disconnected...")

async def handle_input(client):
    while True:
        data = input("Input send data:").encode()
        # 非同期でデータ送信を行う
        await send_data(client, data)

async def main():
    # デバイスの探索と接続を非同期で行う
    address = await discover_device()
    if not address:
        return

    # まず接続を確立
    async with BleakClient(address) as client:
        # ユーザー入力を受け付ける非同期タスクを開始
        await handle_input(client)

if __name__ == "__main__":
    # メインの処理を非同期で実行
    asyncio.run(main())

async def handle_input(client):の中で、入力受付とデータ送信が行われますが、これらが非同期で実行さる、つまり互いにバックグラウンドで実行するようになるので入力に戻ってくるのが早くなります。

まとめ

XIAO ESP-S3とWindowsでBLE無線通信を試してみました。
このコードはXIAOに限らず、またWindowsに限らず様々な組み合わせでBLE無線通信を行うことができるはずです。
非同期処理はなんかよくわからない部分もありますが、動いたのでヨシです。
www.youtube.com