如何实现NSStream的超时/等待以有效地使方法同步



我有一个蓝牙连接附件的输入流和输出流

我想实现以下目标:

将数据写入outputStream等待,直到inputStream OR上接收到数据,直到10秒过去如果inputStream数据到达返回数据否则返回零

我试着这样实现:

- (APDUResponse *)sendCommandAndWaitForResponse:(NSData *)request {
APDUResponse * result;
if (!deviceIsBusy && request != Nil) {
deviceIsBusy = YES;
timedOut = NO;
responseReceived = NO;
if ([[mySes outputStream] hasSpaceAvailable]) {
[NSThread detachNewThreadSelector:@selector(startTimeout) toTarget:self withObject:nil];
[[mySes outputStream] write:[request bytes] maxLength:[request length]];
while (!timedOut && !responseReceived) {
sleep(2);
NSLog(@"tick");
}
if (responseReceived && response !=nil) {
result = response;
response = nil;
}
[myTimer invalidate];
myTimer = nil;
}
}
deviceIsBusy = NO;
return result;
}
- (void) startTimeout {
NSLog(@"start Timeout");
myTimer = [NSTimer timerWithTimeInterval:10.0 target:self selector:@selector(timerFireMethod:) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop] addTimer:myTimer forMode:NSRunLoopCommonModes];
}
- (void)timerFireMethod:(NSTimer *)timer {
NSLog(@"fired");
timedOut = YES;
}
- (void)stream:(NSStream*)stream handleEvent:(NSStreamEvent)streamEvent
{
switch (streamEvent)
{
case NSStreamEventHasBytesAvailable:
// Process the incoming stream data.
if(stream == [mySes inputStream])
{
uint8_t buf[1024];
unsigned int len = 0;
len = [[mySes inputStream] read:buf maxLength:1024];
if(len) {
_data = [[NSMutableData alloc] init];
[_data appendBytes:(const void *)buf length:len];
NSLog(@"Response: %@", [_data description]);
response = [[APDUResponse alloc] initWithData:_data];
responseReceived = YES;
} else {
NSLog(@"no buffer!");
}
}
break;
... //code not relevant 
}
}

因此,理论上是让一个NSTimer在一个单独的线程上运行,该线程在触发时会设置一个布尔值,然后如果接收到数据,也让handleEvent委托方法设置另一个布尔。在该方法中,我们有一个while循环,当设置了其中一个bool时,该循环将停止。

我遇到的问题是,在"超时场景"中,没有调用timerFireMethod。我的直觉是,我实际上没有在单独的线程上正确设置计时器。

有人能看到这里出了什么问题,或者建议更好地实现上面的要求吗?

为了对固有的异步问题强制使用不合适的同步方法,请使方法sendCommandAndWaitForResponse异步

可以将"流写入"任务封装到异步操作/任务/方法中。例如,您可能会以具有以下接口的NSOperation的并发子类结束:

typedef void (^DataToStreamCopier_completion_t)(id result);
@interface DataToStreamCopier : NSOperation
- (id) initWithData:(NSData*)sourceData
destinationStream:(NSOutputStream*)destinationStream
completion:(DataToStreamCopier_completion_t)completionHandler;
@property (nonatomic) NSThread* workerThread;
@property (nonatomic, copy) NSString* runLoopMode;
@property (atomic, readonly) long long totalBytesCopied;

// NSOperation
- (void) start;
- (void) cancel;
@property (nonatomic, readonly) BOOL isCancelled;
@property (nonatomic, readonly) BOOL isExecuting;
@property (nonatomic, readonly) BOOL isFinished;
@end

您可以使用cancel方法实现"超时"功能。

方法sendCommandAndWaitForResponse:与完成处理程序异步

- (void)sendCommand:(NSData *)request 
completion:(DataToStreamCopier_completion_t)completionHandler
{
DataToStreamCopier* op = [DataToStreamCopier initWithData:request 
destinationStream:self.outputStream 
completion:completionHandler];
[op start];
// setup timeout with block:  ^{ [op cancel]; }
...
}

用法:

[self sendCommand:request completion:^(id result) {
if ([result isKindOfClass[NSError error]]) {
NSLog(@"Error: %@", error);
}
else {
// execute on a certain execution context (main thread) if required:
dispatch_async(dispatch_get_main_queue(), ^{
APDUResponse* response = result;
...    
});
}
}];

注意事项:

不幸的是,通过使用运行循环的底层任务正确地实现并发NSOperation子类并不是应该的那么简单。会出现微妙的并发问题,迫使您使用同步原语,如锁或调度队列,以及其他一些技巧来使其真正可靠。

幸运的是,将任何Run Loop任务包装到并发的NSOperation子类中基本上需要相同的"锅炉板"代码。因此,一旦你有了通用的解决方案,编码工作就是从"模板"中复制/过去,然后根据你的特定目的定制代码。

替代解决方案:

严格地说,如果您不打算将许多这样的任务放入NSOperationQueue中,那么您甚至不需要NSOperation的子类。并发操作可以简单地通过向其发送start方法来启动-不需要NSOperationQueue。然后,不使用NSOperation的子类可以使您自己的实现更简单,因为子类化NSOperation本身也有其微妙之处。

然而,您实际上需要一个"操作对象"来包装驱动NSStream对象的Run Loop,因为实现需要保持状态,而这在简单的异步方法中是无法实现的。

因此,您可以使用任何自定义类,该类可以被视为异步操作,具有startcancel方法,并具有在底层任务完成时通知调用站点的机制。

还有比完成处理程序更强大的方法来通知调用站点。例如:promise或futures(参见wiki文章futures and promise)。

假设您使用Promise实现了自己的"异步操作"类,以此作为通知呼叫站点的手段,例如:

@interface WriteDataToStreamOperation : AsyncOperation
- (void) start;
- (void) cancel;
@property (nonatomic, readonly) BOOL isCancelled;
@property (nonatomic, readonly) BOOL isExecuting;
@property (nonatomic, readonly) BOOL isFinished;
@property (nonatomic, readonly) Promise* promise;
@end

你最初的问题看起来会更加"同步"——尽管它是异步的:

您的sendCommand方法变为:

注意:假设Promise类的某个实现:

- (Promise*) sendCommand:(NSData *)command {
WriteDataToStreamOperation* op = 
[[WriteDataToStreamOperation alloc] initWithData:command 
outputStream:self.outputStream];
[op start];
Promise* promise = op.promise;
[promise setTimeout:100]; // time out after 100 seconds
return promise;
}

注意:promise设置了"超时"。这基本上是注册一个定时器和一个处理程序。如果计时器在promise被底层任务解析之前触发,则计时器块将解析promise并返回超时错误。如何实现(以及IF)这取决于Promise库。(这里,我假设我是RXPromise库的作者。其他实现也可能实现这样的功能)。

用法:

[self sendCommand:request].then(^id(APDUResponse* response) {
// do something with the response
...
return  ...;  // returns the result of the handler
}, 
^id(NSError*error) {
// A Stream error or a timeout error
NSLog(@"Error: %@", error);
return nil;  // returns nothing
});

替代用途:

您可以用其他方式设置超时。现在,假设我们没有在sendCommand:方法中设置超时。

我们可以在"外部"设置超时:

Promise* promise = [self sendCommand:request];
[promise setTimeout:100];
promise.then(^id(APDUResponse* response) {
// do something with the response
...
return  ...;  // returns the result of the handler
}, 
^id(NSError*error) {
// A Stream error or a timeout error
NSLog(@"Error: %@", error);
return nil;  // returns nothing
});

使异步方法同步

通常,您不需要也不应该在应用程序代码中将异步方法"转换"为某种同步方法。这总是导致次优和低效的代码,不必要地消耗系统资源,比如线程。

尽管如此,你可能想在有意义的单元测试中这样做:

单元测试中"同步"异步方法的示例

在测试实现时,您经常希望"等待"(yes同步)结果。事实上,您的底层任务实际上是在Run Loop上执行的,可能是在您想要等待结果的同一个线程上,这并不能使解决方案变得更简单。

然而,您可以使用使用runLoopWait方法的RXPromise库轻松实现这一点,该方法有效地进入运行循环并在那里等待承诺得到解决:

-(void) testSendingCommandShouldReturnResponseBeforeTimeout10 {
Promise* promise = [self sendCommand:request];
[promise setTimeout:10];
[promise.then(^id(APDUResponse* response) {
// do something with the response
XCTAssertNotNil(response);            
return  ...;  // returns the result of the handler
}, 
^id(NSError*error) {
// A Stream error or a timeout error
XCTestFail(@"failed with error: %@", error);
return nil;  // returns nothing
}) runLoopWait];  // "wait" on the run loop
}

在这里,方法runLoopWait将进入一个运行循环,等待promise被解析,或者是由于超时错误,或者是当底层任务解析了promise时。promise不会阻塞主线程,也不会轮询运行循环。当承诺得到解决时,它将离开运行循环。其他运行循环事件将照常处理。

注意:您可以安全地从主线程调用testSendingCommandShouldReturnResponseBeforeTimeout10而不阻塞它。这是绝对必要的,因为您的Stream委托方法也可能在主线程上执行!

单元测试库中通常还有其他方法,它们提供了类似于在进入运行循环时"等待"异步方法或操作的结果的功能。

不建议使用其他方法"等待"异步方法或操作的最终结果。这些方法通常会将方法分派给一个专用线程,然后阻塞它,直到结果可用。

有用的资源

类似操作的类的代码片段(在Gist上),该类使用Promises将流复制到另一个流中:RXStreamToStreamCopier

相关内容

  • 没有找到相关文章