MainQueue basic implementation

This commit is contained in:
2014-03-14 20:38:10 +01:00
parent be7cc7d6f9
commit c809b076bd
4 changed files with 228 additions and 7 deletions

View File

@@ -119,7 +119,7 @@ void WDOperationRelease(WDOperation *operation);
WDOperationQueue *WDOperationQueueRetain(WDOperationQueue *queue);
/*!
* @fn void WDOperationRelease(WDOperation *operation)
* @fn void WDOperationQueueRelease(WDOperationQueue *queue)
* @brief Decrements the retain count of a WDOperationQueue.
* @ingroup wd
* @param[in] queue the queue to release.
@@ -252,6 +252,31 @@ void WDOperationQueueSetName(WDOperationQueue *queue, const char *name);
* @returns The name of the operation queue.
*/
const char * WDOperationQueueGetName(WDOperationQueue *queue);
/*!
* @fn WDOperationQueue *WDOperationQueueMainQueue()
* @brief Returns the operation queue associated with the main thread.
* @ingroup wd
* @details The returned queue executes operations on the main thread.
* @returns The default operation queue bound to the main thread.
* @warning You should never call @ref WDOperationQueueRelease with the main queue, doing so will cause the application to crash.
*/
WDOperationQueue *WDOperationQueueMainQueue();
/*!
* @fn void WDOperationQueueMainQueueLoop()
* @brief Start the main queues loop.
* @ingroup wd
* @details This function normally never returns. If you want to dispatch operations back to the main queue then you should probably do something like this
* ~~~~~~~~~~
* int main(int argc, char *argv[]) {
* ...
* return WDOperationQueueMainQueueLoop();
* }
* ~~~~~~~~~~
* @warning You should never use @ref WDOperationQueueMainQueue to dispatch operations back to the main queue if this function was not never called.
*/
int WDOperationQueueMainQueueLoop();
#ifdef _cplusplus
}

View File

@@ -26,12 +26,13 @@
#define WDOperationQueueResultSuccess 0
#define WDOperationQueueResultFailure 1
static void __initMainQueue() __attribute__((constructor));
void WDOperationDealloc(void *block) __attribute__((visibility("internal")));
void WDOperationQueueDealloc(void *queue) __attribute__((visibility("internal")));
void *WDOperationQueueThreadF(void *args) __attribute__((visibility("internal")));
WDOperation *WDOperationQueuePopOperation(WDOperationQueue *restrict queue) __attribute__((visibility("internal")));
void WDOperationQueuePopAndPerform(WDOperationQueue *restrict queue) __attribute__((visibility("internal")));
void WDOperationPerform(WDOperation *restrict block) __attribute__((visibility("internal")));
/*!
@@ -43,6 +44,10 @@ struct _wd_operation_t {
wd_operation_f queuef; /*!< the operation's function */
void *argument; /*!< the operation's argument */
WDOperationQueue *queue; /*!< the associated queue that launched this operation */
struct _wd_operation_guard_t {
pthread_mutex_t mutex;
pthread_cond_t condition;
} guard; /*!< the data used for thread safety */
struct _wd_operation_wait_t {
pthread_mutex_t mutex;
pthread_cond_t condition;
@@ -82,13 +87,27 @@ struct _wd_operation_queue_t {
unsigned int stop:1; /*!< indicates whether the queue should stop and not to shcedule any further operations for execution */
unsigned int suspend:1; /*!< indicates whether the queue is suspended */
} flags; /*!< the flags of the operations */
};
static struct _wd_operation_queue_t __mainQueue;
/* Operation Queue */
static void __initMainQueue() {
__mainQueue = (struct _wd_operation_queue_t){
{ 0 , 0 }, /* operations */
"WDOperationQueue Main Queue", /* name */
pthread_self(), /* thread */
NULL, /* executingOperation */
{ PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }, /* guard */
{ PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }, /* suspend */
{ 0, 0 } /* flags */
};
TAILQ_INIT(&__mainQueue.operations);
}
WDOperationQueue *WDOperationQueueAllocate(void) {
WDOperationQueue *queue = MEMORY_MANAGEMENT_ALLOC(sizeof(WDOperationQueue));
if ( queue == NULL ) return errno = ENOMEM, (WDOperationQueue *)NULL;
@@ -182,8 +201,6 @@ void *WDOperationQueueThreadF(void *args) {
WDOperationQueuePopAndPerform(queue);
}
/* If any operaitons are still in the queue then WDOperationQueueDealloc() will take care of them */
return (void *)NULL;
}
@@ -199,6 +216,17 @@ int WDOperationQueueAddOperation(WDOperationQueue *restrict queue, WDOperation *
/* If the queue is stoped then it should not accept new operations */
if (queue->flags.stop) return pthread_mutex_unlock(&(queue->guard.mutex)), errno = EINVAL, -WDOperationQueueResultFailure;
/* If the queu is already on another queue */
pthread_mutex_lock(&operation->guard.mutex);
if (operation->queue) return pthread_mutex_unlock(&(queue->guard.mutex)), pthread_mutex_unlock(&operation->guard.mutex), errno = EINVAL, -WDOperationQueueResultFailure;
pthread_mutex_unlock(&operation->guard.mutex);
/* if it was already executed */
pthread_mutex_unlock(&operation->wait.mutex);
if (operation->flags.finished)
return pthread_mutex_unlock(&(queue->guard.mutex)), pthread_mutex_unlock(&operation->wait.mutex), errno = EINVAL, -WDOperationQueueResultFailure;
pthread_mutex_unlock(&operation->wait.mutex);
int wasEmpty = TAILQ_EMPTY(&queue->operations);
struct _list_item *item = MEMORY_MANAGEMENT_ALLOC(sizeof(struct _list_item));
if ( item == NULL ) return pthread_mutex_unlock(&(queue->guard.mutex)), errno = ENOMEM, -WDOperationQueueResultFailure;
@@ -206,7 +234,9 @@ int WDOperationQueueAddOperation(WDOperationQueue *restrict queue, WDOperation *
/* Add the operation to the queue */
item->operation = retain(operation);
TAILQ_INSERT_TAIL(&(queue->operations), item, items);
pthread_mutex_lock(&operation->guard.mutex);
operation->queue = queue;
pthread_mutex_unlock(&operation->guard.mutex);
/* Inform that the queue is no more empty */
if (wasEmpty) pthread_cond_signal(&queue->guard.condition);
@@ -217,6 +247,8 @@ int WDOperationQueueAddOperation(WDOperationQueue *restrict queue, WDOperation *
void WDOperationQueueSuspend(WDOperationQueue *restrict queue, int choice) {
if (NULL == queue) return;
/* Cannot suspend the main queue */
if (queue == &__mainQueue) return;
if (choice < 0) return;
pthread_mutex_lock(&queue->suspend.mutex);
@@ -314,6 +346,8 @@ WDOperation *WDOperationCreate(const wd_operation_f function, void *restrict arg
operation->argument = retain((void *)argument);
pthread_mutex_init(&operation->wait.mutex, NULL);
pthread_cond_init(&operation->wait.condition, NULL);
pthread_mutex_init(&operation->guard.mutex, NULL);
pthread_cond_init(&operation->guard.condition, NULL);
MEMORY_MANAGEMENT_ATTRIBUTE_SET_DEALLOC_FUNCTION(operation, WDOperationDealloc);
return operation;
@@ -324,6 +358,9 @@ void WDOperationDealloc(void *_operation) {
WDOperation *operation = _operation;
pthread_mutex_destroy(&operation->wait.mutex);
pthread_cond_destroy(&operation->wait.condition);
pthread_mutex_destroy(&operation->guard.mutex);
pthread_cond_destroy(&operation->guard.condition);
release((void *)operation->argument);
}
@@ -341,6 +378,7 @@ void WDOperationPerform(WDOperation *restrict operation) {
if ( operation == NULL ) return;
if ( operation->queuef == NULL ) return;
pthread_mutex_lock(&operation->guard.mutex);
/* If the operation was not canceled */
if (!operation->flags.canceled) {
/* Indicate that it is executing */
@@ -348,13 +386,18 @@ void WDOperationPerform(WDOperation *restrict operation) {
/* Associate the operation to the operation queue's executing operation */
operation->queue->executingOperation = retain(operation);
/* Execute the operation with its argument */
pthread_mutex_unlock(&operation->guard.mutex);
operation->queuef(operation, (void *)operation->argument);
pthread_mutex_lock(&operation->guard.mutex);
/* Indicate that the operation is not executing any more */
operation->flags.executing = 0;
/* Disassociate the operation from the queue */
operation->queue->executingOperation = NULL;
release(operation);
operation->queue = NULL;
}
pthread_mutex_unlock(&operation->guard.mutex);
pthread_mutex_lock(&operation->wait.mutex);
@@ -363,17 +406,21 @@ void WDOperationPerform(WDOperation *restrict operation) {
/* Inform any one waiting in WDOperationWaitUntilFinished() call */
pthread_cond_broadcast(&operation->wait.condition);
pthread_mutex_unlock(&operation->wait.mutex);
operation->queue = NULL;
}
WDOperationQueue *WDOperationCurrentOperationQueue(WDOperation *operation) {
if (operation == NULL) return errno = EINVAL, NULL;
return operation->queue;
pthread_mutex_lock(&operation->guard.mutex);
WDOperationQueue *queue = operation->queue;
pthread_mutex_unlock(&operation->guard.mutex);
return queue;
}
void WDOperationCancel(WDOperation *operation) {
if (NULL == operation) { errno = EINVAL; return; }
pthread_mutex_lock(&operation->guard.mutex);
operation->flags.canceled = 1;
pthread_mutex_unlock(&operation->guard.mutex);
}
wd_operation_flags_t WDOperationGetFlags(WDOperation *operation) {
@@ -393,3 +440,12 @@ void WDOperationWaitUntilFinished(WDOperation *operation) {
}
WDOperationQueue *WDOperationQueueMainQueue() {
return &__mainQueue;
}
int WDOperationQueueMainQueueLoop() {
WDOperationQueueThreadF(&__mainQueue);
return WDOperationQueueResultSuccess;
}

51
test/testMainRunLoop.c Normal file
View File

@@ -0,0 +1,51 @@
//
// testMainRunLoop.c
// workdipatcher
//
// Created by George Boumis on 1/2/14.
// Copyright (c) 2014 George Boumis <developer.george.boumis@gmail.com>. All rights reserved.
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include "operationQueue.h"
#include <memory_management/memory_management.h>
void opf(WDOperation *operation, void *arg);
void opmain(WDOperation *operation, void *arg);
int main(int argc, char *argv[]) {
char *backgroundOperationArgument = "backgroundOperationArgument";
WDOperationQueue *operationQueue = WDOperationQueueAllocate();
WDOperation *backgroundOperation = WDOperationCreate(opf, backgroundOperationArgument);
WDOperationQueueAddOperation(operationQueue, backgroundOperation);
WDOperationQueueAddOperation(WDOperationQueueMainQueue(), backgroundOperation);
WDOperationQueueMainQueueLoop();
release(operationQueue);
return 0;
}
void opf(WDOperation *operation, void *arg) {
char *argument = (char *)arg;
WDOperationQueue *queue = WDOperationCurrentOperationQueue(operation);
printf("The background operation is executing in \"%s\" with argument \"%s\"\n", WDOperationQueueGetName(queue), argument);
sleep(1);
char *mainString = MEMORY_MANAGEMENT_ALLOC(strlen("main argument")+1);
sprintf(mainString, "%s", "main argument");
WDOperation *mainOperation = WDOperationCreate(opmain, mainString);
WDOperationQueue *mainQueue = WDOperationQueueMainQueue();
WDOperationQueueAddOperation(mainQueue, mainOperation);
release(mainOperation);
}
void opmain(WDOperation *operation, void *arg) {
char *argument = arg;
WDOperationQueue *queue = WDOperationCurrentOperationQueue(operation);
printf("The main operation is executing in \"%s\" with argument \"%s\"\n", WDOperationQueueGetName(queue), argument);
release(argument);
}

View File

@@ -11,6 +11,8 @@
DE07E7181858DFE400AE3A9B /* testQueue.c in Sources */ = {isa = PBXBuildFile; fileRef = DE07E7171858DFE400AE3A9B /* testQueue.c */; };
DE07E71B1858DFF200AE3A9B /* libworkdipatcher.a in Frameworks */ = {isa = PBXBuildFile; fileRef = DE63F639184AAC0A00CF2A52 /* libworkdipatcher.a */; };
DE63F64E184AAD7500CF2A52 /* libmemorymanagement.a in Frameworks */ = {isa = PBXBuildFile; fileRef = DE63F64B184AAD5900CF2A52 /* libmemorymanagement.a */; };
DEE46169189D274600E1D3E0 /* libworkdipatcher.a in Frameworks */ = {isa = PBXBuildFile; fileRef = DE63F639184AAC0A00CF2A52 /* libworkdipatcher.a */; };
DEE4616F189D275700E1D3E0 /* testMainRunLoop.c in Sources */ = {isa = PBXBuildFile; fileRef = DEE46161189D271000E1D3E0 /* testMainRunLoop.c */; };
/* End PBXBuildFile section */
/* Begin PBXContainerItemProxy section */
@@ -42,6 +44,13 @@
remoteGlobalIDString = DE14C30E184B7055008D6559;
remoteInfo = testMemoryManagement;
};
DEE46165189D274600E1D3E0 /* PBXContainerItemProxy */ = {
isa = PBXContainerItemProxy;
containerPortal = DE63F631184AAC0A00CF2A52 /* Project object */;
proxyType = 1;
remoteGlobalIDString = DE63F638184AAC0A00CF2A52;
remoteInfo = workdipatcher;
};
/* End PBXContainerItemProxy section */
/* Begin PBXCopyFilesBuildPhase section */
@@ -54,6 +63,15 @@
);
runOnlyForDeploymentPostprocessing = 1;
};
DEE4616A189D274600E1D3E0 /* CopyFiles */ = {
isa = PBXCopyFilesBuildPhase;
buildActionMask = 2147483647;
dstPath = /usr/share/man/man1/;
dstSubfolderSpec = 0;
files = (
);
runOnlyForDeploymentPostprocessing = 1;
};
/* End PBXCopyFilesBuildPhase section */
/* Begin PBXFileReference section */
@@ -63,6 +81,8 @@
DE63F639184AAC0A00CF2A52 /* libworkdipatcher.a */ = {isa = PBXFileReference; explicitFileType = archive.ar; includeInIndex = 0; path = libworkdipatcher.a; sourceTree = BUILT_PRODUCTS_DIR; };
DE63F646184AAD5900CF2A52 /* memorymanagement.xcodeproj */ = {isa = PBXFileReference; lastKnownFileType = "wrapper.pb-project"; name = memorymanagement.xcodeproj; path = ../memorymanagement/memorymanagement.xcodeproj; sourceTree = "<group>"; };
DEB887D81858D5B100D149F4 /* operationQueue.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = operationQueue.h; path = include/operationQueue.h; sourceTree = "<group>"; };
DEE46161189D271000E1D3E0 /* testMainRunLoop.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = testMainRunLoop.c; path = test/testMainRunLoop.c; sourceTree = "<group>"; };
DEE4616E189D274600E1D3E0 /* testQueue copy */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = "testQueue copy"; sourceTree = BUILT_PRODUCTS_DIR; };
/* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */
@@ -82,6 +102,14 @@
);
runOnlyForDeploymentPostprocessing = 0;
};
DEE46168189D274600E1D3E0 /* Frameworks */ = {
isa = PBXFrameworksBuildPhase;
buildActionMask = 2147483647;
files = (
DEE46169189D274600E1D3E0 /* libworkdipatcher.a in Frameworks */,
);
runOnlyForDeploymentPostprocessing = 0;
};
/* End PBXFrameworksBuildPhase section */
/* Begin PBXGroup section */
@@ -89,6 +117,7 @@
isa = PBXGroup;
children = (
DE07E7171858DFE400AE3A9B /* testQueue.c */,
DEE46161189D271000E1D3E0 /* testMainRunLoop.c */,
);
name = test;
sourceTree = "<group>";
@@ -109,6 +138,7 @@
children = (
DE63F639184AAC0A00CF2A52 /* libworkdipatcher.a */,
DE07E70E1858DFCE00AE3A9B /* testQueue */,
DEE4616E189D274600E1D3E0 /* testQueue copy */,
);
name = Products;
sourceTree = "<group>";
@@ -187,6 +217,24 @@
productReference = DE63F639184AAC0A00CF2A52 /* libworkdipatcher.a */;
productType = "com.apple.product-type.library.static";
};
DEE46163189D274600E1D3E0 /* testMainRunLoop */ = {
isa = PBXNativeTarget;
buildConfigurationList = DEE4616B189D274600E1D3E0 /* Build configuration list for PBXNativeTarget "testMainRunLoop" */;
buildPhases = (
DEE46166189D274600E1D3E0 /* Sources */,
DEE46168189D274600E1D3E0 /* Frameworks */,
DEE4616A189D274600E1D3E0 /* CopyFiles */,
);
buildRules = (
);
dependencies = (
DEE46164189D274600E1D3E0 /* PBXTargetDependency */,
);
name = testMainRunLoop;
productName = testQueue;
productReference = DEE4616E189D274600E1D3E0 /* testQueue copy */;
productType = "com.apple.product-type.tool";
};
/* End PBXNativeTarget section */
/* Begin PBXProject section */
@@ -216,6 +264,7 @@
targets = (
DE63F638184AAC0A00CF2A52 /* workdipatcher */,
DE07E70D1858DFCE00AE3A9B /* testQueue */,
DEE46163189D274600E1D3E0 /* testMainRunLoop */,
);
};
/* End PBXProject section */
@@ -254,6 +303,14 @@
);
runOnlyForDeploymentPostprocessing = 0;
};
DEE46166189D274600E1D3E0 /* Sources */ = {
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
DEE4616F189D275700E1D3E0 /* testMainRunLoop.c in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
/* End PBXSourcesBuildPhase section */
/* Begin PBXTargetDependency section */
@@ -267,6 +324,11 @@
name = memorymanagement;
targetProxy = DE63F64C184AAD7100CF2A52 /* PBXContainerItemProxy */;
};
DEE46164189D274600E1D3E0 /* PBXTargetDependency */ = {
isa = PBXTargetDependency;
target = DE63F638184AAC0A00CF2A52 /* workdipatcher */;
targetProxy = DEE46165189D274600E1D3E0 /* PBXContainerItemProxy */;
};
/* End PBXTargetDependency section */
/* Begin XCBuildConfiguration section */
@@ -408,6 +470,24 @@
};
name = Release;
};
DEE4616C189D274600E1D3E0 /* Debug */ = {
isa = XCBuildConfiguration;
buildSettings = {
GCC_PREPROCESSOR_DEFINITIONS = (
"DEBUG=1",
"$(inherited)",
);
PRODUCT_NAME = "testQueue copy";
};
name = Debug;
};
DEE4616D189D274600E1D3E0 /* Release */ = {
isa = XCBuildConfiguration;
buildSettings = {
PRODUCT_NAME = "testQueue copy";
};
name = Release;
};
/* End XCBuildConfiguration section */
/* Begin XCConfigurationList section */
@@ -438,6 +518,15 @@
defaultConfigurationIsVisible = 0;
defaultConfigurationName = Release;
};
DEE4616B189D274600E1D3E0 /* Build configuration list for PBXNativeTarget "testMainRunLoop" */ = {
isa = XCConfigurationList;
buildConfigurations = (
DEE4616C189D274600E1D3E0 /* Debug */,
DEE4616D189D274600E1D3E0 /* Release */,
);
defaultConfigurationIsVisible = 0;
defaultConfigurationName = Release;
};
/* End XCConfigurationList section */
};
rootObject = DE63F631184AAC0A00CF2A52 /* Project object */;