diff --git a/open_wearable/lib/models/sensor_streams.dart b/open_wearable/lib/models/sensor_streams.dart index 3058f6dd..a8688649 100644 --- a/open_wearable/lib/models/sensor_streams.dart +++ b/open_wearable/lib/models/sensor_streams.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:collection'; import 'package:open_earable_flutter/open_earable_flutter.dart'; @@ -10,19 +11,38 @@ import 'package:open_earable_flutter/open_earable_flutter.dart'; class SensorStreams { SensorStreams._(); - static final Map> _sharedStreams = {}; + static final Map>> + _sharedStreamsByDevice = {}; + static Map> _createIdentitySensorStreamMap() => + LinkedHashMap>.identity(); - static Stream shared(Sensor sensor) { - return _sharedStreams.putIfAbsent(sensor, () { - return sensor.sensorStream.asBroadcastStream( - onCancel: (_) { - _sharedStreams.remove(sensor); - }, - ); - }); + static Stream shared({ + required Wearable wearable, + required Sensor sensor, + }) { + final deviceStreams = _sharedStreamsByDevice.putIfAbsent( + wearable.deviceId, + // Identity map avoids collisions when Sensor overrides ==/hashCode + // non-uniquely across different devices. + _createIdentitySensorStreamMap, + ); + return deviceStreams.putIfAbsent( + sensor, + () => sensor.sensorStream.asBroadcastStream(), + ); } - static void clearForSensor(Sensor sensor) { - _sharedStreams.remove(sensor); + static void clearForSensor({ + required Wearable wearable, + required Sensor sensor, + }) { + final deviceStreams = _sharedStreamsByDevice[wearable.deviceId]; + if (deviceStreams == null) { + return; + } + deviceStreams.remove(sensor); + if (deviceStreams.isEmpty) { + _sharedStreamsByDevice.remove(wearable.deviceId); + } } } diff --git a/open_wearable/lib/view_models/sensor_data_provider.dart b/open_wearable/lib/view_models/sensor_data_provider.dart index 516cf640..f870c2b0 100644 --- a/open_wearable/lib/view_models/sensor_data_provider.dart +++ b/open_wearable/lib/view_models/sensor_data_provider.dart @@ -19,6 +19,7 @@ import 'package:open_wearable/models/sensor_streams.dart'; /// Provides: /// - `sensorValues` and `displayTimestamp` for chart/value widgets. class SensorDataProvider with ChangeNotifier { + final Wearable wearable; final Sensor sensor; final int timeWindow; // seconds @@ -38,6 +39,7 @@ class SensorDataProvider with ChangeNotifier { DateTime? _lastSensorArrivalTime; SensorDataProvider({ + required this.wearable, required this.sensor, this.timeWindow = 5, }) { @@ -64,8 +66,10 @@ class SensorDataProvider with ChangeNotifier { } void _listenToStream() { - _sensorStreamSubscription = - SensorStreams.shared(sensor).listen((sensorValue) { + _sensorStreamSubscription = SensorStreams.shared( + wearable: wearable, + sensor: sensor, + ).listen((sensorValue) { sensorValues.add(sensorValue); _lastSensorTimestamp = sensorValue.timestamp; _lastSensorArrivalTime = DateTime.now(); diff --git a/open_wearable/lib/view_models/sensor_recorder_provider.dart b/open_wearable/lib/view_models/sensor_recorder_provider.dart index 1d295c59..e15d5580 100644 --- a/open_wearable/lib/view_models/sensor_recorder_provider.dart +++ b/open_wearable/lib/view_models/sensor_recorder_provider.dart @@ -165,7 +165,10 @@ class SensorRecorderProvider with ChangeNotifier { File file = await recorder.start( filepath: filepath, - inputStream: SensorStreams.shared(sensor), + inputStream: SensorStreams.shared( + wearable: wearable, + sensor: sensor, + ), ); logger.i( diff --git a/open_wearable/lib/widgets/sensors/sensor_page.dart b/open_wearable/lib/widgets/sensors/sensor_page.dart index cc8e9f7b..6b5fdf86 100644 --- a/open_wearable/lib/widgets/sensors/sensor_page.dart +++ b/open_wearable/lib/widgets/sensors/sensor_page.dart @@ -189,7 +189,10 @@ class _SensorPageState extends State in wearable.requireCapability().sensors) { _sensorDataProviders.putIfAbsent( (wearable, sensor), - () => SensorDataProvider(sensor: sensor), + () => SensorDataProvider( + wearable: wearable, + sensor: sensor, + ), ); } } diff --git a/open_wearable/lib/widgets/sensors/values/sensor_values_page.dart b/open_wearable/lib/widgets/sensors/values/sensor_values_page.dart index b3483772..f847afa7 100644 --- a/open_wearable/lib/widgets/sensors/values/sensor_values_page.dart +++ b/open_wearable/lib/widgets/sensors/values/sensor_values_page.dart @@ -144,7 +144,10 @@ class _SensorValuesPageState extends State in wearable.requireCapability().sensors) { _sensorDataProvider.putIfAbsent( (wearable, sensor), - () => SensorDataProvider(sensor: sensor), + () => SensorDataProvider( + wearable: wearable, + sensor: sensor, + ), ); } } diff --git a/open_wearable/pubspec.lock b/open_wearable/pubspec.lock index be71e2bd..0243cabb 100644 --- a/open_wearable/pubspec.lock +++ b/open_wearable/pubspec.lock @@ -500,10 +500,10 @@ packages: dependency: transitive description: name: matcher - sha256: "12956d0ad8390bbcc63ca2e1469c0619946ccb52809807067a7020d57e647aa6" + sha256: dc0b7dc7651697ea4ff3e69ef44b0407ea32c487a39fff6a4004fa585e901861 url: "https://pub.dev" source: hosted - version: "0.12.18" + version: "0.12.19" material_color_utilities: dependency: transitive description: @@ -945,10 +945,10 @@ packages: dependency: transitive description: name: test_api - sha256: "19a78f63e83d3a61f00826d09bc2f60e191bf3504183c001262be6ac75589fb8" + sha256: "8161c84903fd860b26bfdefb7963b3f0b68fee7adea0f59ef805ecca346f0c7a" url: "https://pub.dev" source: hosted - version: "0.7.8" + version: "0.7.10" tuple: dependency: transitive description: