summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt12
-rw-r--r--src/jobs.c166
-rw-r--r--src/jobs.h12
3 files changed, 136 insertions, 54 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ee0adb44..abc81c8b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -26,12 +26,12 @@ ADD_DEFINITIONS(-DINFER_EXTENSION)
############################################################################
# TODO: improve below setting by using config file
-option(WITH_SIG_MONITOR_DUMPSTACK "activate dump stack on error" ON)
-option(WITH_SIG_MONITOR_SIGNALS "activate handling of signals" ON)
-option(WITH_SIG_MONITOR_FOR_CALL "activate monitoring of calls" ON)
-option(WITH_SIG_MONITOR_TIMERS "activate monitoring of call expiration" ON)
-option(WITH_AFB_HOOK "include hooking" ON)
-option(WITH_AFB_TRACE "include monitoring trace" ON)
+option(WITH_SIG_MONITOR_DUMPSTACK "Activate dump stack on error" ON)
+option(WITH_SIG_MONITOR_SIGNALS "Activate handling of signals" ON)
+option(WITH_SIG_MONITOR_FOR_CALL "Activate monitoring of calls" ON)
+option(WITH_SIG_MONITOR_TIMERS "Activate monitoring of call expiration" ON)
+option(WITH_AFB_HOOK "Include hooking" ON)
+option(WITH_AFB_TRACE "Include monitoring trace" ON)
option(WITH_SUPERVISOR "Activates installation of supervisor" OFF)
option(WITH_DBUS_TRANSPARENCY "Allows API transparency over DBUS" OFF)
option(WITH_LEGACY_BINDING_V1 "Includes the legacy Binding API version 1" OFF)
diff --git a/src/jobs.c b/src/jobs.c
index a518766b..a9773aa5 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -40,11 +40,17 @@
#define EVENT_TIMEOUT_TOP ((uint64_t)-1)
#define EVENT_TIMEOUT_CHILD ((uint64_t)10000)
-struct thread;
-
/** Internal shortcut for callback */
typedef void (*job_cb_t)(int, void*);
+/** starting mode for jobs */
+enum start_mode
+{
+ Start_Default, /**< Start a thread if more than one jobs is pending */
+ Start_Urgent, /**< Always start a thread */
+ Start_Lazy /**< Never start a thread */
+};
+
/** Description of a pending job */
struct job
{
@@ -89,19 +95,22 @@ struct sync
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-/* count allowed, started and running threads */
-static int allowed = 0; /** allowed count of threads */
-static int started = 0; /** started count of threads */
-static int running = 0; /** running count of threads */
-static int remains = 0; /** allowed count of waiting jobs */
+/* counts for threads */
+static int allowed_thread_count = 0; /** allowed count of threads */
+static int started_thread_count = 0; /** started count of threads */
+static int busy_thread_count = 0; /** count of busy threads */
/* list of threads */
static struct thread *threads;
static _Thread_local struct thread *current_thread;
+/* counts for jobs */
+static int remaining_job_count = 0; /** count of job that can be created */
+static int allowed_job_count = 0; /** allowed count of pending jobs */
+
/* queue of pending jobs */
-static struct job *first_job;
-static struct job *free_jobs;
+static struct job *first_pending_job;
+static struct job *first_free_job;
/* event loop */
static struct evmgr *evmgr;
@@ -125,9 +134,9 @@ static struct job *job_create(
struct job *job;
/* try recyle existing job */
- job = free_jobs;
+ job = first_free_job;
if (job)
- free_jobs = job->next;
+ first_free_job = job->next;
else {
/* allocation without blocking */
pthread_mutex_unlock(&mutex);
@@ -165,8 +174,8 @@ static void job_add(struct job *job)
job->next = NULL;
/* search end and blockers */
- pjob = &first_job;
- ijob = first_job;
+ pjob = &first_pending_job;
+ ijob = first_pending_job;
while (ijob) {
if (group && ijob->group == group)
job->blocked = 1;
@@ -176,7 +185,7 @@ static void job_add(struct job *job)
/* queue the jobs */
*pjob = job;
- remains--;
+ remaining_job_count--;
}
/**
@@ -185,11 +194,11 @@ static void job_add(struct job *job)
*/
static inline struct job *job_get()
{
- struct job *job = first_job;
+ struct job *job = first_pending_job;
while (job && job->blocked)
job = job->next;
if (job)
- remains++;
+ remaining_job_count++;
return job;
}
@@ -205,8 +214,8 @@ static inline void job_release(struct job *job)
const void *group;
/* first unqueue the job */
- pjob = &first_job;
- ijob = first_job;
+ pjob = &first_pending_job;
+ ijob = first_pending_job;
while (ijob != job) {
pjob = &ijob->next;
ijob = ijob->next;
@@ -224,8 +233,8 @@ static inline void job_release(struct job *job)
}
/* recycle the job */
- job->next = free_jobs;
- free_jobs = job;
+ job->next = first_free_job;
+ first_free_job = job;
}
/**
@@ -395,13 +404,13 @@ static void thread_run_internal(volatile struct thread *me)
pthread_mutex_lock(&mutex);
} else {
/* no job and no event loop */
- running--;
- if (!running)
+ busy_thread_count--;
+ if (!busy_thread_count)
ERROR("Entering job deep sleep! Check your bindings.");
me->waits = 1;
pthread_cond_wait(&cond, &mutex);
me->waits = 0;
- running++;
+ busy_thread_count++;
}
}
/* cleanup */
@@ -435,13 +444,13 @@ static void thread_main()
{
struct thread me;
- running++;
- started++;
+ busy_thread_count++;
+ started_thread_count++;
sig_monitor_init_timeouts();
thread_run_internal(&me);
sig_monitor_clean_timeouts();
- started--;
- running--;
+ started_thread_count--;
+ busy_thread_count--;
}
/**
@@ -490,7 +499,7 @@ static int start_one_thread()
* The remaining parameter is the parameter 'arg1'
* given here.
* @param arg The second argument for 'callback'
- * @param start Allow to start a thread if not zero
+ * @param start The start mode for threads
* @return 0 in case of success or -1 in case of error
*/
static int queue_job(
@@ -498,30 +507,33 @@ static int queue_job(
int timeout,
void (*callback)(int, void*),
void *arg,
- int start)
+ enum start_mode start_mode)
{
struct job *job;
int rc;
pthread_mutex_lock(&mutex);
- /* allocates the job */
- job = job_create(group, timeout, callback, arg);
- if (!job)
- goto error;
-
/* check availability */
- if (remains <= 0) {
+ if (remaining_job_count <= 0) {
ERROR("can't process job with threads: too many jobs");
errno = EBUSY;
- goto error2;
+ goto error;
}
+ /* allocates the job */
+ job = job_create(group, timeout, callback, arg);
+ if (!job)
+ goto error;
+
/* start a thread if needed */
- if (start && running == started && started < allowed) {
+ if (start_mode != Start_Lazy
+ && busy_thread_count == started_thread_count
+ && (start_mode == Start_Urgent || remaining_job_count + started_thread_count < allowed_job_count)
+ && started_thread_count < allowed_thread_count) {
/* all threads are busy and a new can be started */
rc = start_one_thread();
- if (rc < 0 && started == 0) {
+ if (rc < 0 && started_thread_count == 0) {
ERROR("can't start initial thread: %m");
goto error2;
}
@@ -536,8 +548,8 @@ static int queue_job(
return 0;
error2:
- job->next = free_jobs;
- free_jobs = job;
+ job->next = first_free_job;
+ first_free_job = job;
error:
pthread_mutex_unlock(&mutex);
return -1;
@@ -566,7 +578,59 @@ int jobs_queue(
void (*callback)(int, void*),
void *arg)
{
- return queue_job(group, timeout, callback, arg, 1);
+ return queue_job(group, timeout, callback, arg, Start_Default);
+}
+
+/**
+ * Queues lazyly a new asynchronous job represented by 'callback' and 'arg'
+ * for the 'group' and the 'timeout'.
+ * Jobs are queued FIFO and are possibly executed in parallel
+ * concurrently except for job of the same group that are
+ * executed sequentially in FIFO order.
+ * @param group The group of the job or NULL when no group.
+ * @param timeout The maximum execution time in seconds of the job
+ * or 0 for unlimited time.
+ * @param callback The function to execute for achieving the job.
+ * Its first parameter is either 0 on normal flow
+ * or the signal number that broke the normal flow.
+ * The remaining parameter is the parameter 'arg1'
+ * given here.
+ * @param arg The second argument for 'callback'
+ * @return 0 in case of success or -1 in case of error
+ */
+int jobs_queue_lazy(
+ const void *group,
+ int timeout,
+ void (*callback)(int, void*),
+ void *arg)
+{
+ return queue_job(group, timeout, callback, arg, Start_Lazy);
+}
+
+/**
+ * Queues urgently a new asynchronous job represented by 'callback' and 'arg'
+ * for the 'group' and the 'timeout'.
+ * Jobs are queued FIFO and are possibly executed in parallel
+ * concurrently except for job of the same group that are
+ * executed sequentially in FIFO order.
+ * @param group The group of the job or NULL when no group.
+ * @param timeout The maximum execution time in seconds of the job
+ * or 0 for unlimited time.
+ * @param callback The function to execute for achieving the job.
+ * Its first parameter is either 0 on normal flow
+ * or the signal number that broke the normal flow.
+ * The remaining parameter is the parameter 'arg1'
+ * given here.
+ * @param arg The second argument for 'callback'
+ * @return 0 in case of success or -1 in case of error
+ */
+int jobs_queue_urgent(
+ const void *group,
+ int timeout,
+ void (*callback)(int, void*),
+ void *arg)
+{
+ return queue_job(group, timeout, callback, arg, Start_Urgent);
}
/**
@@ -766,7 +830,12 @@ void jobs_acquire_event_manager()
* @param start The start routine to activate (can't be NULL)
* @return 0 in case of success or -1 in case of error.
*/
-int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
+int jobs_start(
+ int allowed_count,
+ int start_count,
+ int waiter_count,
+ void (*start)(int signum, void* arg),
+ void *arg)
{
int rc, launched;
struct job *job;
@@ -780,17 +849,18 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
pthread_mutex_lock(&mutex);
/* check whether already running */
- if (current_thread || allowed) {
+ if (current_thread || allowed_thread_count) {
ERROR("thread already started");
errno = EINVAL;
goto error;
}
/* records the allowed count */
- allowed = allowed_count;
- started = 0;
- running = 0;
- remains = waiter_count;
+ allowed_thread_count = allowed_count;
+ started_thread_count = 0;
+ busy_thread_count = 0;
+ remaining_job_count = waiter_count;
+ allowed_job_count = waiter_count;
/* start at least one thread: the current one */
launched = 1;
@@ -838,7 +908,7 @@ void jobs_exit(void (*handler)())
t = t->next;
}
- /* wait the threads */
+ /* wake up the threads */
pthread_cond_broadcast(&cond);
/* leave */
diff --git a/src/jobs.h b/src/jobs.h
index 4b0fa8bb..44304ede 100644
--- a/src/jobs.h
+++ b/src/jobs.h
@@ -25,6 +25,18 @@ extern int jobs_queue(
void (*callback)(int signum, void* arg),
void *arg);
+extern int jobs_queue_lazy(
+ const void *group,
+ int timeout,
+ void (*callback)(int signum, void* arg),
+ void *arg);
+
+extern int jobs_queue_urgent(
+ const void *group,
+ int timeout,
+ void (*callback)(int signum, void* arg),
+ void *arg);
+
extern int jobs_enter(
const void *group,
int timeout,