Files
core-data-ensembles-mirror/Framework/Source/General/CDEAsynchronousTaskQueue.m

213 lines
6.5 KiB
Objective-C

//
// CDEAsynchronousTaskQueue.m
// Ensembles
//
// Created by Drew McCormack on 4/13/13.
// Copyright (c) 2013 Drew McCormack. All rights reserved.
//
#import "CDEAsynchronousTaskQueue.h"
@interface CDEAsynchronousTaskQueue ()
@property (readwrite, atomic, assign) NSUInteger numberOfTasksCompleted;
@property (readwrite, atomic, assign) CDETaskQueueTerminationPolicy terminationPolicy;
@end
@implementation CDEAsynchronousTaskQueue {
NSEnumerator *taskEnumerator;
CDECompletionBlock completion;
NSMutableArray *errors;
BOOL isExecuting, isFinished;
}
@synthesize tasks = tasks;
@synthesize numberOfTasksCompleted = numberOfTasksCompleted;
@synthesize terminationPolicy = terminationPolicy;
- (instancetype)initWithTasks:(NSArray *)newTasks terminationPolicy:(CDETaskQueueTerminationPolicy)policy completion:(CDECompletionBlock)newCompletion
{
self = [super init];
if (self) {
errors = [NSMutableArray array];
terminationPolicy = policy;
tasks = [newTasks copy];
completion = [newCompletion copy];
numberOfTasksCompleted = 0;
}
return self;
}
- (instancetype)initWithTasks:(NSArray *)newTasks completion:(CDECompletionBlock)newCompletion
{
return [self initWithTasks:newTasks terminationPolicy:CDETaskQueueTerminationPolicyStopOnError completion:newCompletion];
}
- (instancetype)initWithTask:(CDEAsynchronousTaskBlock)task repeatCount:(NSUInteger)count terminationPolicy:(CDETaskQueueTerminationPolicy)policy completion:(CDECompletionBlock)newCompletion
{
NSMutableArray *newTasks = [NSMutableArray array];
for (NSUInteger i = 0; i < count; i++) {
[newTasks addObject:[task copy]];
}
return [self initWithTasks:newTasks terminationPolicy:policy completion:newCompletion];
}
- (instancetype)initWithTask:(CDEAsynchronousTaskBlock)task completion:(CDECompletionBlock)newCompletion
{
return [self initWithTask:task repeatCount:1 terminationPolicy:CDETaskQueueTerminationPolicyStopOnError completion:newCompletion];
}
- (void)start
{
if (![NSThread isMainThread]) {
[self performSelectorOnMainThread:@selector(start) withObject:nil waitUntilDone:NO];
return;
}
@synchronized (self) {
[self willChangeValueForKey:@"isFinished"];
[self willChangeValueForKey:@"isExecuting"];
isFinished = NO;
isExecuting = YES;
[self didChangeValueForKey:@"isExecuting"];
[self didChangeValueForKey:@"isFinished"];
}
self.numberOfTasksCompleted = 0;
taskEnumerator = [tasks objectEnumerator];
[self performSelector:@selector(startNextTask) withObject:nil afterDelay:0.0];
}
- (NSError *)combineErrors
{
if (errors.count == 0)
return nil;
else if (errors.count == 1)
return errors.lastObject;
else {
NSError *multipleErrorsError = [NSError errorWithDomain:CDEErrorDomain code:CDEErrorCodeMultipleErrors userInfo:@{@"errors": [errors copy]}];
return multipleErrorsError;
}
}
- (void)startNextTask
{
if (![NSThread isMainThread]) {
[self performSelectorOnMainThread:@selector(startNextTask) withObject:nil waitUntilDone:NO];
return;
}
@autoreleasepool {
if (self.isCancelled) {
[self performSelector:@selector(finish) withObject:nil afterDelay:0.0];
return;
}
// We use delayed performs in some calls, because this method can be invoked from a callback block
// and we don't want to have the block released when it is on the stack.
// So we let the stack unwind first.
CDEAsynchronousTaskBlock block = [taskEnumerator nextObject];
self.numberOfTasksCompleted = block ? [tasks indexOfObject:block] : tasks.count;
if (block) {
CDEAsynchronousTaskCallbackBlock next = [^(NSError *error, BOOL stop) {
BOOL shouldStop = NO;
if (error && terminationPolicy == CDETaskQueueTerminationPolicyStopOnError) shouldStop = YES;
if (!error && terminationPolicy == CDETaskQueueTerminationPolicyStopOnSuccess) shouldStop = YES;
[errors addObject:(error ? : [NSNull null])];
if (stop) shouldStop = YES;
if (shouldStop) {
[self performSelector:@selector(finish) withObject:nil afterDelay:0.0];
}
else
[self performSelector:@selector(startNextTask) withObject:nil afterDelay:0.0];
} copy];
block(next);
}
else {
[self performSelector:@selector(finish) withObject:nil afterDelay:0.0];
}
}
}
- (void)finish
{
if (![NSThread isMainThread]) {
[self performSelectorOnMainThread:@selector(finish) withObject:nil waitUntilDone:NO];
return;
}
NSError *error = nil;
NSError *lastError = errors.lastObject;
if ((id)lastError == [NSNull null]) lastError = nil;
if (self.isCancelled) {
error = [NSError errorWithDomain:CDEErrorDomain code:CDEErrorCodeCancelled userInfo:nil];
}
else {
switch (terminationPolicy) {
case CDETaskQueueTerminationPolicyStopOnError:
error = lastError;
break;
case CDETaskQueueTerminationPolicyStopOnSuccess:
error = lastError ? [self combineErrors] : nil; // If succeeded, don't return any errors
break;
case CDETaskQueueTerminationPolicyCompleteAll:
{
NSMutableArray *nonNull = [errors mutableCopy];
[nonNull removeObject:[NSNull null]];
error = nonNull.count > 0 ? [self combineErrors] : nil;
}
break;
default:
@throw [NSException exceptionWithName:CDEException reason:@"Invalid policy" userInfo:nil];
break;
}
}
if (completion) completion(error);
@synchronized (self) {
[self willChangeValueForKey:@"isFinished"];
[self willChangeValueForKey:@"isExecuting"];
isFinished = YES;
isExecuting = NO;
[self didChangeValueForKey:@"isExecuting"];
[self didChangeValueForKey:@"isFinished"];
}
tasks = nil;
completion = NULL;
}
- (BOOL)isConcurrent
{
return YES;
}
- (BOOL)isExecuting
{
@synchronized (self) {
return isExecuting;
}
}
- (BOOL)isFinished
{
@synchronized (self) {
return isFinished;
}
}
- (NSUInteger)numberOfTasks
{
return tasks.count;
}
@end