summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2019-02-15 20:49:54 +0100
committerJosé Bollo <jose.bollo@iot.bzh>2019-04-02 09:58:49 +0200
commit901a38c28bf3fe7cc3e58e3fad36190fbae585be (patch)
treefe2f0ed7fd1153a9a5a9006628f8864877763724
parent50deefa0f08b88b99748abd57560222744d2f8db (diff)
jobs: Refactor exiting jobs
The new termination function can allow the restart because it doesn't abort the waiting jobs. So after calling 'jobs_exit', all threads stop. The function 'job_start' returns. The threads that are in blocking state, i.e. in a call to 'jobs_enter' or 'jobs_call' are stopped. An error status -1 with errno=EINTR is returned in that case. But before returning, that function calls the exit handler if any. Change-Id: I85a4b1976b09b18804eb681af940531ae5ace6c3 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/jobs.c115
-rw-r--r--src/jobs.h3
-rw-r--r--src/main-afb-daemon.c19
3 files changed, 63 insertions, 74 deletions
diff --git a/src/jobs.c b/src/jobs.c
index 83ca2ed4..936c6f1d 100644
--- a/src/jobs.c
+++ b/src/jobs.c
@@ -68,6 +68,7 @@ struct thread
pthread_t tid; /**< the thread id */
volatile unsigned stop: 1; /**< stop requested */
volatile unsigned waits: 1; /**< is waiting? */
+ volatile unsigned leaved: 1; /**< was leaved? */
};
/**
@@ -84,7 +85,6 @@ struct sync
void *arg; /**< the argument of the callback */
};
-
/* synchronisation of threads */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
@@ -106,6 +106,8 @@ static struct job *free_jobs;
/* event loop */
static struct evmgr *evmgr;
+static void (*exit_handler)();
+
/**
* Create a new job with the given parameters
* @param group the group of the job
@@ -236,6 +238,7 @@ static inline void job_release(struct job *job)
* flow, isn't used
* @param arg the job to run
*/
+__attribute__((unused))
static void job_cancel(int signum, void *arg)
{
struct job *job = arg;
@@ -319,6 +322,7 @@ static void thread_enter(volatile struct thread *me)
me->tid = pthread_self();
me->stop = 0;
me->waits = 0;
+ me->leaved = 0;
me->nholder = 0;
me->upper = current_thread;
me->next = threads;
@@ -485,13 +489,15 @@ static int start_one_thread()
* The remaining parameter is the parameter 'arg1'
* given here.
* @param arg The second argument for 'callback'
+ * @param start Allow to start a thread if not zero
* @return 0 in case of success or -1 in case of error
*/
-int jobs_queue(
+static int queue_job(
const void *group,
int timeout,
void (*callback)(int, void*),
- void *arg)
+ void *arg,
+ int start)
{
struct job *job;
int rc;
@@ -511,7 +517,7 @@ int jobs_queue(
}
/* start a thread if needed */
- if (running == started && started < allowed) {
+ if (start && running == started && started < allowed) {
/* all threads are busy and a new can be started */
rc = start_one_thread();
if (rc < 0 && started == 0) {
@@ -537,6 +543,32 @@ error:
}
/**
+ * Queues a new asynchronous job represented by 'callback' and 'arg'
+ * for the 'group' and the 'timeout'.
+ * Jobs are queued FIFO and are possibly executed in parallel
+ * concurrently except for job of the same group that are
+ * executed sequentially in FIFO order.
+ * @param group The group of the job or NULL when no group.
+ * @param timeout The maximum execution time in seconds of the job
+ * or 0 for unlimited time.
+ * @param callback The function to execute for achieving the job.
+ * Its first parameter is either 0 on normal flow
+ * or the signal number that broke the normal flow.
+ * The remaining parameter is the parameter 'arg1'
+ * given here.
+ * @param arg The second argument for 'callback'
+ * @return 0 in case of success or -1 in case of error
+ */
+int jobs_queue(
+ const void *group,
+ int timeout,
+ void (*callback)(int, void*),
+ void *arg)
+{
+ return queue_job(group, timeout, callback, arg, 1);
+}
+
+/**
* Internal helper function for 'jobs_enter'.
* @see jobs_enter, jobs_leave
*/
@@ -590,7 +622,10 @@ static int do_sync(
else
thread_run_external(&sync->thread);
pthread_mutex_unlock(&mutex);
- return 0;
+ if (sync->thread.leaved)
+ return 0;
+ errno = EINTR;
+ return -1;
}
/**
@@ -638,6 +673,7 @@ int jobs_leave(struct jobloop *jobloop)
if (!t) {
errno = EINVAL;
} else {
+ t->leaved = 1;
t->stop = 1;
if (t->waits)
pthread_cond_broadcast(&cond);
@@ -776,44 +812,23 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
rc = 0;
error:
pthread_mutex_unlock(&mutex);
+ if (exit_handler)
+ exit_handler();
return rc;
}
/**
- * Terminate all the threads and cancel all pending jobs.
+ * Exit jobs threads and call handler if not NULL.
*/
-void jobs_terminate()
+void jobs_exit(void (*handler)())
{
- struct job *job, *head, *tail;
- pthread_t me, *others;
struct thread *t;
- int count;
-
- /* how am i? */
- me = pthread_self();
/* request all threads to stop */
pthread_mutex_lock(&mutex);
- allowed = 0;
-
- /* count the number of threads */
- count = 0;
- t = threads;
- while (t) {
- if (!t->upper && !pthread_equal(t->tid, me))
- count++;
- t = t->next;
- }
- /* fill the array of threads */
- others = alloca(count * sizeof *others);
- count = 0;
- t = threads;
- while (t) {
- if (!t->upper && !pthread_equal(t->tid, me))
- others[count++] = t->tid;
- t = t->next;
- }
+ /* set the handler */
+ exit_handler = handler;
/* stops the threads */
t = threads;
@@ -824,41 +839,7 @@ void jobs_terminate()
/* wait the threads */
pthread_cond_broadcast(&cond);
- pthread_mutex_unlock(&mutex);
- while (count)
- pthread_join(others[--count], NULL);
- pthread_mutex_lock(&mutex);
- /* cancel pending jobs of other threads */
- remains = 0;
- head = first_job;
- first_job = NULL;
- tail = NULL;
- while (head) {
- /* unlink the job */
- job = head;
- head = job->next;
-
- /* search if job is stacked for current */
- t = current_thread;
- while (t && t->job != job)
- t = t->upper;
- if (t) {
- /* yes, relink it at end */
- if (tail)
- tail->next = job;
- else
- first_job = job;
- tail = job;
- job->next = NULL;
- } else {
- /* no cancel the job */
- pthread_mutex_unlock(&mutex);
- sig_monitor(0, job_cancel, job);
- free(job);
- pthread_mutex_lock(&mutex);
- }
- }
+ /* leave */
pthread_mutex_unlock(&mutex);
}
-
diff --git a/src/jobs.h b/src/jobs.h
index a99c9622..4b0fa8bb 100644
--- a/src/jobs.h
+++ b/src/jobs.h
@@ -39,8 +39,6 @@ extern int jobs_call(
void (*callback)(int, void*),
void *arg);
-extern void jobs_terminate();
-
extern int jobs_start(
int allowed_count,
int start_count,
@@ -50,3 +48,4 @@ extern int jobs_start(
extern void jobs_acquire_event_manager();
+extern void jobs_exit(void (*handler)());
diff --git a/src/main-afb-daemon.c b/src/main-afb-daemon.c
index 4ef4b280..c2a53513 100644
--- a/src/main-afb-daemon.c
+++ b/src/main-afb-daemon.c
@@ -399,6 +399,17 @@ static struct afb_hsrv *start_http_server()
| execute_command
+--------------------------------------------------------- */
+static void wait_child_and_exit()
+{
+ pid_t pidchld = childpid;
+
+ childpid = 0;
+ if (!SELF_PGROUP)
+ killpg(pidchld, SIGKILL);
+ waitpid(pidchld, NULL, 0);
+ exit(0);
+}
+
static void on_sigchld(int signum, siginfo_t *info, void *uctx)
{
if (info->si_pid == childpid) {
@@ -406,11 +417,9 @@ static void on_sigchld(int signum, siginfo_t *info, void *uctx)
case CLD_EXITED:
case CLD_KILLED:
case CLD_DUMPED:
- childpid = 0;
- if (!SELF_PGROUP)
- killpg(info->si_pid, SIGKILL);
- waitpid(info->si_pid, NULL, 0);
- exit(0);
+ jobs_exit(wait_child_and_exit);
+ default:
+ break;
}
}
}