Skip to content

Commit

Permalink
Implement ACL flow control
Browse files Browse the repository at this point in the history
* Get rid of additional queue
* Read max HCI packets on init
* Use semaphore to control ACL packet flow
  • Loading branch information
lulf committed Mar 21, 2024
1 parent 97cae94 commit 4eb64f3
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 162 deletions.
2 changes: 1 addition & 1 deletion examples/nrf-sdc/src/bin/ble_l2cap_central.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn main(spawner: Spawner) {
let conn = Connection::connect(&adapter, report.addr).await;
info!("Connected, creating l2cap channel");
const PAYLOAD_LEN: usize = 27;
let mut ch1: L2capChannel<'_, '_, PAYLOAD_LEN> =
let mut ch1: L2capChannel<'_, '_, _, PAYLOAD_LEN> =
unwrap!(L2capChannel::create(&adapter, &conn, 0x2349).await);
info!("New l2cap channel created, sending some data!");
for i in 0..10 {
Expand Down
2 changes: 1 addition & 1 deletion examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn main(spawner: Spawner) {

info!("Connection established");

let mut ch1: L2capChannel<'_, '_, PAYLOAD_LEN> =
let mut ch1: L2capChannel<'_, '_, _, PAYLOAD_LEN> =
unwrap!(L2capChannel::accept(&adapter, &conn, 0x2349).await);

info!("L2CAP channel accepted");
Expand Down
1 change: 1 addition & 0 deletions host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ embassy-futures = "0.1"
futures = { version = "0.3", default-features = false }
heapless = "0.8"
trouble-host-macros = { version = "0.1.0", path = "../host-macros" }
futures-intrusive = { version = "0.5.0", default-features = false }

# Logging
log = { version = "0.4.16", optional = true }
Expand Down
126 changes: 64 additions & 62 deletions host/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use crate::packet_pool::{self, DynamicPacketPool, PacketPool, Qos, ATT_ID};
use crate::pdu::Pdu;
use crate::scan::{ScanConfig, ScanReport};
use crate::types::l2cap::L2capLeSignal;
use crate::{codec, Error};
use crate::{AdapterError, Error};
use bt_hci::cmd::controller_baseband::{Reset, SetEventMask};
use bt_hci::cmd::le::{
LeCreateConn, LeCreateConnParams, LeSetAdvData, LeSetAdvEnable, LeSetAdvParams, LeSetScanEnable, LeSetScanParams,
LeCreateConn, LeCreateConnParams, LeReadBufferSize, LeSetAdvData, LeSetAdvEnable, LeSetAdvParams, LeSetScanEnable,
LeSetScanParams,
};
use bt_hci::cmd::link_control::{Disconnect, DisconnectParams};
use bt_hci::cmd::{AsyncCmd, SyncCmd};
Expand All @@ -24,9 +25,10 @@ use bt_hci::event::Event;
use bt_hci::param::{BdAddr, ConnHandle, DisconnectReason, EventMask};
use bt_hci::{Controller, ControllerToHostPacket};
use bt_hci::{ControllerCmdAsync, ControllerCmdSync};
use embassy_futures::select::{select4, Either4};
use embassy_futures::select::{select3, Either3};
use embassy_sync::blocking_mutex::raw::RawMutex;
use embassy_sync::channel::Channel;
use futures_intrusive::sync::LocalSemaphore;

pub struct HostResources<M: RawMutex, const CHANNELS: usize, const PACKETS: usize, const L2CAP_MTU: usize> {
pool: PacketPool<M, L2CAP_MTU, PACKETS, CHANNELS>,
Expand Down Expand Up @@ -58,8 +60,8 @@ pub struct Adapter<
pub(crate) channels: ChannelManager<'d, M, CHANNELS, L2CAP_TXQ, L2CAP_RXQ>,
pub(crate) att_inbound: Channel<M, (ConnHandle, Pdu<'d>), L2CAP_RXQ>,
pub(crate) pool: &'d dyn DynamicPacketPool<'d>,
pub(crate) permits: LocalSemaphore,

pub(crate) outbound: Channel<M, (ConnHandle, Pdu<'d>), L2CAP_TXQ>,
pub(crate) control: Channel<M, ControlCommand, 1>,
pub(crate) scanner: Channel<M, ScanReport, 1>,
}
Expand All @@ -70,19 +72,6 @@ pub(crate) enum ControlCommand {
Connect(LeCreateConnParams),
}

#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum HandleError {
Codec(codec::Error),
Other,
}

impl From<codec::Error> for HandleError {
fn from(e: codec::Error) -> Self {
Self::Codec(e)
}
}

impl<'d, M, T, const CONNS: usize, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP_RXQ: usize>
Adapter<'d, M, T, CONNS, CHANNELS, L2CAP_TXQ, L2CAP_RXQ>
where
Expand All @@ -106,16 +95,15 @@ where
pool: &host_resources.pool,
att_inbound: Channel::new(),
scanner: Channel::new(),

outbound: Channel::new(),
control: Channel::new(),
permits: LocalSemaphore::new(true, 0),
}
}

/// Performs a BLE scan, return a report for discovering peripherals.
///
/// Scan is stopped when a report is received. Call this method repeatedly to continue scanning.
pub async fn scan(&self, config: &ScanConfig) -> Result<ScanReport, Error<T::Error>>
pub async fn scan(&self, config: &ScanConfig) -> Result<ScanReport, AdapterError<T::Error>>
where
T: ControllerCmdSync<LeSetScanEnable> + ControllerCmdSync<LeSetScanParams>,
{
Expand All @@ -139,7 +127,7 @@ where
///
/// Advertisements are stopped when a connection is made against this host,
/// in which case a handle for the connection is returned.
pub async fn advertise<'m>(&'m self, config: &AdvertiseConfig<'_>) -> Result<Connection<'m>, Error<T::Error>>
pub async fn advertise<'m>(&'m self, config: &AdvertiseConfig<'_>) -> Result<Connection<'m>, AdapterError<T::Error>>
where
T: ControllerCmdSync<LeSetAdvData> + ControllerCmdSync<LeSetAdvEnable> + ControllerCmdSync<LeSetAdvParams>,
{
Expand Down Expand Up @@ -174,18 +162,18 @@ where
pub fn gatt_server<'reference, 'values, const MAX: usize>(
&'reference self,
table: &'reference AttributeTable<'values, M, MAX>,
) -> GattServer<'reference, 'values, 'd, M, MAX> {
) -> GattServer<'reference, 'values, 'd, M, T, MAX> {
GattServer {
server: AttributeServer::new(table),
pool: self.pool,
pool_id: packet_pool::ATT_ID,
rx: self.att_inbound.receiver().into(),
tx: self.outbound.sender().into(),
tx: self.hci(),
connections: &self.connections,
}
}

async fn handle_acl(&self, acl: AclPacket<'_>) -> Result<(), HandleError> {
async fn handle_acl(&self, acl: AclPacket<'_>) -> Result<(), Error> {
let (conn, packet) = L2capPacket::decode(acl)?;
match packet.channel {
L2CAP_CID_ATT => {
Expand All @@ -203,7 +191,7 @@ where
match self.channels.control(conn, signal).await {
Ok(_) => {}
Err(_) => {
return Err(HandleError::Other);
return Err(Error::Other);
}
}
}
Expand All @@ -221,13 +209,14 @@ where
Ok(())
}

pub async fn run(&self) -> Result<(), Error<T::Error>>
pub async fn run(&self) -> Result<(), AdapterError<T::Error>>
where
T: ControllerCmdSync<Disconnect>
+ ControllerCmdSync<SetEventMask>
+ ControllerCmdSync<Reset>
+ ControllerCmdAsync<LeCreateConn>
+ ControllerCmdSync<LeSetScanEnable>,
+ ControllerCmdSync<LeSetScanEnable>
+ ControllerCmdSync<LeReadBufferSize>,
{
self.control.send(ControlCommand::Init).await;

Expand Down Expand Up @@ -280,7 +269,8 @@ where
let _ = self.connections.disconnect(e.handle);
}
Event::NumberOfCompletedPackets(c) => {
//info!("Confirmed {} packets sent", c.completed_packets.len());
info!("Confirmed {} packets sent", c.completed_packets.len());
self.permits.release(c.completed_packets.len());
}
_ => {
warn!("Unknown event: {:?}", event);
Expand All @@ -298,34 +288,12 @@ where
Ok(())
};

// Task handling shuffling outbound ACL data.
let tx_fut = async {
let (handle, pdu) = self.outbound.receive().await;
let acl = AclPacket::new(
handle,
AclPacketBoundary::FirstNonFlushable,
AclBroadcastFlag::PointToPoint,
pdu.as_ref(),
);
match self.controller.write_acl_data(&acl).await {
Ok(_) => {}
Err(e) => {
#[cfg(feature = "defmt")]
let e = defmt::Debug2Format(&e);
warn!("Error writing some ACL data to controller: {:?}", e);
panic!(":(");
}
}
Ok(())
};

// Task issuing control.
// TODO: This does not necessarily need to go through the channel and could be dispatch directly
let control_fut = async {
let command = self.control.receive().await;
match command {
ControlCommand::Connect(params) => {
LeSetScanEnable::new(false, false).exec(&self.controller).await.unwrap();
LeCreateConn::new(
params.le_scan_interval,
params.le_scan_window,
Expand All @@ -341,18 +309,16 @@ where
params.max_ce_length,
)
.exec(&self.controller)
.await
.unwrap();
.await?;
}
ControlCommand::Disconnect(params) => {
self.connections.disconnect(params.handle).unwrap();
Disconnect::new(params.handle, params.reason)
.exec(&self.controller)
.await
.unwrap();
.await?;
}
ControlCommand::Init => {
Reset::new().exec(&self.controller).await.unwrap();
Reset::new().exec(&self.controller).await?;
SetEventMask::new(
EventMask::new()
.enable_le_meta(true)
Expand All @@ -362,8 +328,15 @@ where
.enable_disconnection_complete(true),
)
.exec(&self.controller)
.await
.unwrap();
.await?;

let ret = LeReadBufferSize::new().exec(&self.controller).await?;
info!(
"Setting max flow control packets to {}",
ret.total_num_le_acl_data_packets
);
self.permits.release(ret.total_num_le_acl_data_packets as usize);
// TODO: Configure ACL max buffer size as well?
}
}
Ok(())
Expand Down Expand Up @@ -395,6 +368,7 @@ where
AclBroadcastFlag::PointToPoint,
&tx[..len],
);
self.permits.acquire(1).await.disarm();
match self.controller.write_acl_data(&acl).await {
Ok(_) => {}
Err(e) => {
Expand All @@ -407,13 +381,41 @@ where
Ok(())
};
// info!("Entering select loop");
let result: Result<(), Error<T::Error>> = match select4(rx_fut, tx_fut, control_fut, signal_fut).await {
Either4::First(result) => result,
Either4::Second(result) => result,
Either4::Third(result) => result,
Either4::Fourth(result) => result,
let result: Result<(), AdapterError<T::Error>> = match select3(rx_fut, control_fut, signal_fut).await {
Either3::First(result) => result,
Either3::Second(result) => result,
Either3::Third(result) => result,
};
result?;
}
}

pub(crate) fn hci(&self) -> HciController<'_, T> {
HciController {
controller: &self.controller,
permits: &self.permits,
}
}
}

pub struct HciController<'d, T: Controller> {
controller: &'d T,
permits: &'d LocalSemaphore,
}

impl<'d, T: Controller> HciController<'d, T> {
pub(crate) async fn send(&self, handle: ConnHandle, pdu: Pdu<'_>) -> Result<(), AdapterError<T::Error>> {
self.permits.acquire(1).await.disarm();
let acl = AclPacket::new(
handle,
AclPacketBoundary::FirstNonFlushable,
AclBroadcastFlag::PointToPoint,
&pdu.as_ref(),
);
self.controller
.write_acl_data(&acl)
.await
.map_err(AdapterError::Controller)?;
Ok(())
}
}
10 changes: 5 additions & 5 deletions host/src/attribute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::{cell::RefCell, fmt};
use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex};

pub use crate::types::uuid::Uuid;
use crate::{att::AttErrorCode, cursor::WriteCursor};
use crate::{att::AttErrorCode, cursor::WriteCursor, Error};

pub const GENERIC_ACCESS_SERVICE_UUID16: Uuid = Uuid::Uuid16(0x1800u16.to_le_bytes());
pub const CHARACTERISTIC_DEVICE_NAME_UUID16: Uuid = Uuid::Uuid16(0x2A00u16.to_le_bytes());
Expand Down Expand Up @@ -319,7 +319,7 @@ impl<'d, M: RawMutex, const MAX: usize> AttributeTable<'d, M, MAX> {
/// otherwise this function will panic.
///
/// If the characteristic for the handle cannot be found, an error is returned.
pub fn set(&self, handle: CharacteristicHandle, input: &[u8]) -> Result<(), ()> {
pub fn set(&self, handle: CharacteristicHandle, input: &[u8]) -> Result<(), Error> {
self.iterate(|mut it| {
while let Some(att) = it.next() {
if att.handle == handle.handle {
Expand All @@ -330,7 +330,7 @@ impl<'d, M: RawMutex, const MAX: usize> AttributeTable<'d, M, MAX> {
}
}
}
Err(())
Err(Error::NotFound)
})
}

Expand All @@ -339,7 +339,7 @@ impl<'d, M: RawMutex, const MAX: usize> AttributeTable<'d, M, MAX> {
/// The return value of the closure is returned in this function and is assumed to be infallible.
///
/// If the characteristic for the handle cannot be found, an error is returned.
pub fn get<F: FnMut(&[u8]) -> T, T>(&self, handle: CharacteristicHandle, mut f: F) -> Result<T, ()> {
pub fn get<F: FnMut(&[u8]) -> T, T>(&self, handle: CharacteristicHandle, mut f: F) -> Result<T, Error> {
self.iterate(|mut it| {
while let Some(att) = it.next() {
if att.handle == handle.handle {
Expand All @@ -349,7 +349,7 @@ impl<'d, M: RawMutex, const MAX: usize> AttributeTable<'d, M, MAX> {
}
}
}
Err(())
Err(Error::NotFound)
})
}
}
Expand Down
Loading

0 comments on commit 4eb64f3

Please sign in to comment.