如何使用StreamTransformer从数据库流创建一个StreamController



我正在尝试将具有数据库数据模型的数据库中的流转换为域数据模型。

把不同的信息拼凑在一起,我很困惑。而在我发现的StreamTransformer示例中,流始终是单个对象而不是列表,我的示例流ObjectBox数据的结果返回Stream<List<PropertyObjectBox>>

然后它似乎缺少了一块拼图,如何从StreamStreamController

那么我要如何修改下面的代码呢?

/// <PropertyObjectBox> data model of Property in ObjectBox database
/// <PropertyModel> data model in data layer (not really needed, I know)
/// <Property> data model in domain layer

@override
// -->> should this return Stream<List<Property>?> or Stream<Property>?
// -->> or something else to comply with handleError?
Stream<List<Property>?> streamOnDeviceProperties() {
Stream<List<PropertyObjectBox>> propObStream = objectbox.propertyBox.query()
.watch(triggerImmediately: true).map((query) => query
// Watching the query produces a Stream<Query<Property>>
// To get the actual data inside a List<Property>, we need to call find() on the query
.find());
//-->> again, PropertyObjectBox or List<PropertyObjectBox>?
var streamTransformer = StreamTransformer<PropertyObjectBox, dynamic>.fromHandlers(
handleData: (PropertyObjectBox data, EventSink sink) {
final propertyModel = PropertyModel.fromObjectbox(data);
final Property property = propertyModel.toDomain();
sink.add(property);
},
handleError: (Object error, StackTrace stacktrace, EventSink sink) {
sink.addError('Something went wrong: $error');
},
handleDone: (EventSink sink) => sink.close(),
);
//-->> next line causes error 'the getter 'stream' isn't defined for the type 'Stream<List<PropertyObjectBox>>'  
var controllerStream = propObStream.stream.transform(streamTransformer);

这应该能奏效:已更新

@override
Stream<List<Property>?> streamOnDeviceProperties() {
Stream<List<PropertyObjectBox>?> propObStream = objectbox.propertyBox.query()
.watch(triggerImmediately: true).map((query) => query.find());
// List<PropertyObjectBox>?
StreamTransformer<List<PropertyObjectBox>?,List<Property>?> streamTransformer = StreamTransformer<List<PropertyObjectBox>?,List<Property>?>.fromHandlers(
handleData: (List<PropertyObjectBox>? data, EventSink sink) {
var newList = data!.map((value) {
final propertyModel = PropertyModel.fromObjectbox(value);
final Property property = propertyModel.toDomain();
return property;
}).toList();
sink.add(newList);
},
handleError: (Object error, StackTrace stacktrace, EventSink sink) {
sink.addError('Something went wrong: $error');
},
handleDone: (EventSink sink) => sink.close(),
);

// if you need a Controller although I don't know why
Stream<List<Property>?> newStream = propObStream.transform(streamTransformer);

final StreamController<List<Property>?> streamController = StreamController<List<Property>?>(
onPause: () => print('Paused'),
onResume: () => print('Resumed'),
onCancel: () => print('Cancelled'),
onListen: () => print('Listens'),
);
streamController.addStream(newStream);
//********************************

return streamTransformer.bind(propObStream);
}

这样访问流:

Stream<List<Property>?> _mystream = streamOnDeviceProperties();

最新更新