aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/jobs.c228
-rw-r--r--src/jobs.h20
2 files changed, 80 insertions, 168 deletions
diff --git a/src/jobs.c b/src/jobs.c
index 8f51cc99..8725d00e 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -44,7 +44,7 @@
#define EVENT_TIMEOUT_CHILD ((uint64_t)10000)
/** Internal shortcut for callback */
-typedef void (*job_cb_t)(int, void*, void *, void*);
+typedef void (*job_cb_t)(int, void*);
/** Description of a pending job */
struct job
@@ -52,9 +52,7 @@ struct job
struct job *next; /**< link to the next job enqueued */
void *group; /**< group of the request */
job_cb_t callback; /**< processing callback */
- void *arg1; /**< first arg */
- void *arg2; /**< second arg */
- void *arg3; /**< third arg */
+ void *arg; /**< argument */
int timeout; /**< timeout in second for processing the request */
unsigned blocked: 1; /**< is an other request blocking this one ? */
unsigned dropped: 1; /**< is removed ? */
@@ -87,10 +85,16 @@ struct thread
*/
struct sync
{
- void (*callback)(int, void*); /**< the synchrnous callback */
- void *arg; /**< the argument of the callback */
+ struct thread thread; /**< thread loop data */
+ union {
+ void (*callback)(int, void*); /**< the synchronous callback */
+ void (*enter)(int signum, void *closure, struct jobloop *jobloop);
+ /**< the entering synchronous routine */
+ };
+ void *arg; /**< the argument of the callback */
};
+
/* synchronisation of threads */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
@@ -116,18 +120,14 @@ static struct job *free_jobs;
* @param group the group of the job
* @param timeout the timeout of the job (0 if none)
* @param callback the function that achieves the job
- * @param arg1 the first argument of the callback
- * @param arg2 the second argument of the callback
- * @param arg3 the third argument of the callback
+ * @param arg the argument of the callback
* @return the created job unblock or NULL when no more memory
*/
static struct job *job_create(
void *group,
int timeout,
job_cb_t callback,
- void *arg1,
- void *arg2,
- void *arg3)
+ void *arg)
{
struct job *job;
@@ -149,9 +149,7 @@ static struct job *job_create(
job->group = group;
job->timeout = timeout;
job->callback = callback;
- job->arg1 = arg1;
- job->arg2 = arg2;
- job->arg3 = arg3;
+ job->arg = arg;
job->blocked = 0;
job->dropped = 0;
end:
@@ -246,22 +244,6 @@ static inline void job_release(struct job *job)
}
/**
- * Monitored normal callback for a job.
- * This function is called by the monitor
- * to run the job when the safe environment
- * is set.
- * @param signum 0 on normal flow or the number
- * of the signal that interrupted the normal
- * flow
- * @param arg the job to run
- */
-static void job_call(int signum, void *arg)
-{
- struct job *job = arg;
- job->callback(signum, job->arg1, job->arg2, job->arg3);
-}
-
-/**
* Monitored cancel callback for a job.
* This function is called by the monitor
* to cancel the job when the safe environment
@@ -273,7 +255,8 @@ static void job_call(int signum, void *arg)
*/
static void job_cancel(int signum, void *arg)
{
- job_call(SIGABRT, arg);
+ struct job *job = arg;
+ job->callback(SIGABRT, job->arg);
}
/**
@@ -338,7 +321,7 @@ static void thread_run(volatile struct thread *me)
/* run the job */
pthread_mutex_unlock(&mutex);
- sig_monitor(job->timeout, job_call, job);
+ sig_monitor(job->timeout, job->callback, job->arg);
pthread_mutex_lock(&mutex);
/* release the run job */
@@ -422,29 +405,7 @@ static int start_one_thread()
}
/**
- * Queues a new asynchronous job represented by 'callback'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group The group of the job or NULL when no group.
- * @param timeout The maximum execution time in seconds of the job
- * or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- * Its first parameter is either 0 on normal flow
- * or the signal number that broke the normal flow.
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue0(
- void *group,
- int timeout,
- void (*callback)(int signum))
-{
- return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg1'
+ * Queues a new asynchronous job represented by 'callback' and 'arg'
* for the 'group' and the 'timeout'.
* Jobs are queued FIFO and are possibly executed in parallel
* concurrently except for job of the same group that are
@@ -466,64 +427,6 @@ int jobs_queue(
void (*callback)(int, void*),
void *arg)
{
- return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group The group of the job or NULL when no group.
- * @param timeout The maximum execution time in seconds of the job
- * or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- * Its first parameter is either 0 on normal flow
- * or the signal number that broke the normal flow.
- * The remaining parameters are the parameters 'arg[12]'
- * given here.
- * @param arg1 The second argument for 'callback'
- * @param arg2 The third argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue2(
- void *group,
- int timeout,
- void (*callback)(int, void*, void*),
- void *arg1,
- void *arg2)
-{
- return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group The group of the job or NULL when no group.
- * @param timeout The maximum execution time in seconds of the job
- * or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- * Its first parameter is either 0 on normal flow
- * or the signal number that broke the normal flow.
- * The remaining parameters are the parameters 'arg[123]'
- * given here.
- * @param arg1 The second argument for 'callback'
- * @param arg2 The third argument for 'callback'
- * @param arg3 The forth argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue3(
- void *group,
- int timeout,
- void (*callback)(int, void*, void *, void*),
- void *arg1,
- void *arg2,
- void *arg3)
-{
const char *info;
struct job *job;
int rc;
@@ -531,7 +434,7 @@ int jobs_queue3(
pthread_mutex_lock(&mutex);
/* allocates the job */
- job = job_create(group, timeout, callback, arg1, arg2, arg3);
+ job = job_create(group, timeout, callback, arg);
if (!job) {
errno = ENOMEM;
info = "out of memory";
@@ -574,35 +477,45 @@ error:
}
/**
- * Enter a synchronisation point: activates the job given by 'callback'
- * and 'closure' using 'group' and 'timeout' to control sequencing and
- * execution time.
- * @param group the group for sequencing jobs
- * @param timeout the time in seconds allocated to the job
- * @param callback the callback that will handle the job.
- * it receives 3 parameters: 'signum' that will be 0
- * on normal flow or the catched signal number in case
- * of interrupted flow, the context 'closure' as given and
- * a 'jobloop' reference that must be used when the job is
- * terminated to unlock the current execution flow.
- * @param closure the context completion closure for the callback
- * @return 0 on success or -1 in case of error
+ * Internal helper function for 'jobs_enter'.
+ * @see jobs_enter, jobs_leave
*/
-int jobs_enter(
+static void enter_cb(int signum, void *closure)
+{
+ struct sync *sync = closure;
+ sync->enter(signum, sync->arg, (void*)&sync->thread);
+}
+
+/**
+ * Internal helper function for 'jobs_call'.
+ * @see jobs_call
+ */
+static void call_cb(int signum, void *closure)
+{
+ struct sync *sync = closure;
+ sync->callback(signum, sync->arg);
+ jobs_leave((void*)&sync->thread);
+}
+
+/**
+ * Internal helper for synchronous jobs. It enters
+ * a new thread loop for evaluating the given job
+ * as recorded by the couple 'sync_cb' and 'sync'.
+ * @see jobs_call, jobs_enter, jobs_leave
+ */
+static int do_sync(
void *group,
int timeout,
- void (*callback)(int signum, void *closure, struct jobloop *jobloop),
- void *closure
+ void (*sync_cb)(int signum, void *closure),
+ struct sync *sync
)
{
-
struct job *job;
- struct thread me;
pthread_mutex_lock(&mutex);
/* allocates the job */
- job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
+ job = job_create(group, timeout, sync_cb, sync);
if (!job) {
ERROR("out of memory");
errno = ENOMEM;
@@ -614,12 +527,41 @@ int jobs_enter(
job_add(job);
/* run until stopped */
- thread_run(&me);
+ thread_run(&sync->thread);
pthread_mutex_unlock(&mutex);
return 0;
}
/**
+ * Enter a synchronisation point: activates the job given by 'callback'
+ * and 'closure' using 'group' and 'timeout' to control sequencing and
+ * execution time.
+ * @param group the group for sequencing jobs
+ * @param timeout the time in seconds allocated to the job
+ * @param callback the callback that will handle the job.
+ * it receives 3 parameters: 'signum' that will be 0
+ * on normal flow or the catched signal number in case
+ * of interrupted flow, the context 'closure' as given and
+ * a 'jobloop' reference that must be used when the job is
+ * terminated to unlock the current execution flow.
+ * @param arg the argument to the callback
+ * @return 0 on success or -1 in case of error
+ */
+int jobs_enter(
+ void *group,
+ int timeout,
+ void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+ void *closure
+)
+{
+ struct sync sync;
+
+ sync.enter = callback;
+ sync.arg = closure;
+ return do_sync(group, timeout, enter_cb, &sync);
+}
+
+/**
* Unlocks the execution flow designed by 'jobloop'.
* @param jobloop indication of the flow to unlock
* @return 0 in case of success of -1 on error
@@ -644,17 +586,6 @@ int jobs_leave(struct jobloop *jobloop)
}
/**
- * Internal helper function for 'jobs_call'.
- * @see jobs_call, jobs_enter, jobs_leave
- */
-static void call_cb(int signum, void *closure, struct jobloop *jobloop)
-{
- struct sync *sync = closure;
- sync->callback(signum, sync->arg);
- jobs_leave(jobloop);
-}
-
-/**
* Calls synchronously the job represented by 'callback' and 'arg1'
* for the 'group' and the 'timeout' and waits for its completion.
* @param group The group of the job or NULL when no group.
@@ -678,7 +609,8 @@ int jobs_call(
sync.callback = callback;
sync.arg = arg;
- return jobs_enter(group, timeout, call_cb, &sync);
+
+ return do_sync(group, timeout, call_cb, &sync);
}
/**
@@ -801,7 +733,7 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
}
/* queue the start job */
- job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
+ job = job_create(NULL, 0, (job_cb_t)start, NULL);
if (!job) {
ERROR("out of memory");
errno = ENOMEM;
diff --git a/src/jobs.h b/src/jobs.h
index a98b27ad..49f45dd5 100644
--- a/src/jobs.h
+++ b/src/jobs.h
@@ -20,32 +20,12 @@
struct sd_event;
struct jobloop;
-extern int jobs_queue0(
- void *group,
- int timeout,
- void (*callback)(int signum));
-
extern int jobs_queue(
void *group,
int timeout,
void (*callback)(int signum, void* arg),
void *arg);
-extern int jobs_queue2(
- void *group,
- int timeout,
- void (*callback)(int signum, void* arg1, void *arg2),
- void *arg1,
- void *arg2);
-
-extern int jobs_queue3(
- void *group,
- int timeout,
- void (*callback)(int signum, void* arg1, void *arg2, void *arg3),
- void *arg1,
- void *arg2,
- void *arg3);
-
extern int jobs_enter(
void *group,
int timeout,