aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2016-04-27 21:25:54 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2016-04-27 21:30:01 +0200
commitf262b0f726ac0577f40525038b779185f144873f (patch)
treec6c2e3664cb6082fcb74e7ef26e430ce2371f4fa
parent34acb0f8d191593c9761e027424f13ae42831133 (diff)
first add of asynchonous handling
Change-Id: Id9159d33937dc23342d32892f77998fb8cef0000 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--include/afb-req-itf.h1
-rw-r--r--plugins/afm-main-plugin/afm-main-plugin.c83
-rw-r--r--src/afb-hreq.c12
-rw-r--r--src/afb-hreq.h6
-rw-r--r--src/afb-hsrv.c93
-rw-r--r--src/afb-hsrv.h1
6 files changed, 140 insertions, 56 deletions
diff --git a/include/afb-req-itf.h b/include/afb-req-itf.h
index d4ade2be..df133f5c 100644
--- a/include/afb-req-itf.h
+++ b/include/afb-req-itf.h
@@ -44,6 +44,7 @@ struct afb_req {
void *req_closure;
void *ctx_closure;
};
+
static inline struct afb_arg afb_req_get(struct afb_req req, const char *name)
{
return req.itf->get(req.req_closure, name);
diff --git a/plugins/afm-main-plugin/afm-main-plugin.c b/plugins/afm-main-plugin/afm-main-plugin.c
index 553cffc3..21e27e82 100644
--- a/plugins/afm-main-plugin/afm-main-plugin.c
+++ b/plugins/afm-main-plugin/afm-main-plugin.c
@@ -51,6 +51,22 @@ static struct afb_evmgr evmgr;
static struct jbus *jbus;
+struct memo
+{
+ struct afb_req request;
+ const char *method;
+};
+
+static struct memo *make_memo(struct afb_req request, const char *method)
+{
+ struct memo *memo = malloc(sizeof *memo);
+ if (memo != NULL) {
+ memo->request = request;
+ memo->method = method;
+ }
+ return memo;
+}
+
static void application_list_changed(const char *data, void *closure)
{
afb_evmgr_push(evmgr, "application-list-changed", NULL);
@@ -78,49 +94,70 @@ static struct json_object *embed(const char *tag, struct json_object *obj)
return result;
}
-static void embed_call_void(struct afb_req request, const char *method)
+static void embed_call_void_callback(int status, struct json_object *obj, struct memo *memo)
{
- struct json_object *obj = jbus_call_sj_sync(jbus, method, "true");
if (interface->verbosity)
- fprintf(stderr, "(afm-main-plugin) %s(true) -> %s\n", method,
+ fprintf(stderr, "(afm-main-plugin) %s(true) -> %s\n", memo->method,
obj ? json_object_to_json_string(obj) : "NULL");
if (obj == NULL) {
- afb_req_fail(request, "failed", "framework daemon failure");
- return;
+ afb_req_fail(memo->request, "failed", "framework daemon failure");
+ } else {
+ obj = json_object_get(obj);
+ obj = embed(memo->method, obj);
+ if (obj == NULL) {
+ afb_req_fail(memo->request, "failed", "framework daemon failure");
+ } else {
+ afb_req_success(memo->request, obj, NULL);
+ }
}
- obj = json_object_get(obj);
- obj = embed(method, obj);
+ free(memo);
+}
+
+static void embed_call_void(struct afb_req request, const char *method)
+{
+ struct memo *memo = make_memo(request, method);
+ if (memo == NULL)
+ afb_req_fail(request, "failed", "out of memory");
+ else if (jbus_call_sj(jbus, method, "true", (void*)embed_call_void_callback, memo) < 0) {
+ afb_req_fail(request, "failed", "dbus failure");
+ free(memo);
+ }
+}
+
+static void call_appid_callback(int status, struct json_object *obj, struct memo *memo)
+{
+ if (interface->verbosity)
+ fprintf(stderr, "(afm-main-plugin) %s -> %s\n", memo->method,
+ obj ? json_object_to_json_string(obj) : "NULL");
if (obj == NULL) {
- afb_req_fail(request, "failed", "framework daemon failure");
- return;
+ afb_req_fail(memo->request, "failed", "framework daemon failure");
+ } else {
+ obj = json_object_get(obj);
+ afb_req_success(memo->request, obj, NULL);
}
- afb_req_success(request, obj, NULL);
+ free(memo);
}
static void call_appid(struct afb_req request, const char *method)
{
- struct json_object *obj;
+ struct memo *memo;
char *sid;
const char *id = afb_req_value(request, _id_);
if (id == NULL) {
afb_req_fail(request, "bad-request", "missing 'id'");
return;
}
- if (asprintf(&sid, "\"%s\"", id) <= 0) {
+ memo = make_memo(request, method);
+ if (asprintf(&sid, "\"%s\"", id) <= 0 || memo == NULL) {
afb_req_fail(request, "server-error", "out of memory");
+ free(memo);
return;
}
- obj = jbus_call_sj_sync(jbus, method, sid);
- if (interface->verbosity)
- fprintf(stderr, "(afm-main-plugin) %s(%s) -> %s\n", method, sid,
- obj ? json_object_to_json_string(obj) : "NULL");
- free(sid);
- if (obj == NULL) {
- afb_req_fail(request, "failed", "framework daemon failure");
- return;
+ if (jbus_call_sj(jbus, method, sid, (void*)call_appid_callback, memo) < 0) {
+ afb_req_fail(request, "failed", "dbus failure");
+ free(memo);
}
- obj = json_object_get(obj);
- afb_req_success(request, obj, NULL);
+ free(sid);
}
static void call_runid(struct afb_req request, const char *method)
@@ -143,7 +180,6 @@ static void call_runid(struct afb_req request, const char *method)
afb_req_success(request, obj, NULL);
}
-
/************************** entries ******************************/
static void runnables(struct afb_req request)
@@ -250,7 +286,6 @@ static void install(struct afb_req request)
obj = jbus_call_sj_sync(jbus, _install_, query);
if (interface->verbosity)
-;
fprintf(stderr, "(afm-main-plugin) install(%s) -> %s\n", query,
obj ? json_object_to_json_string(obj) : "NULL");
free(query);
diff --git a/src/afb-hreq.c b/src/afb-hreq.c
index c90b2372..6b7d64be 100644
--- a/src/afb-hreq.c
+++ b/src/afb-hreq.c
@@ -152,6 +152,10 @@ static void afb_hreq_reply_v(struct afb_hreq *hreq, unsigned status, struct MHD_
{
char *cookie;
const char *k, *v;
+
+ if (hreq->replied != 0)
+ return;
+
k = va_arg(args, const char *);
while (k != NULL) {
v = va_arg(args, const char *);
@@ -164,6 +168,14 @@ static void afb_hreq_reply_v(struct afb_hreq *hreq, unsigned status, struct MHD_
}
MHD_queue_response(hreq->connection, status, response);
MHD_destroy_response(response);
+
+ hreq->replied = 1;
+ if (hreq->suspended != 0) {
+ extern void run_micro_httpd(struct afb_hsrv *hsrv);
+ MHD_resume_connection (hreq->connection);
+ hreq->suspended = 0;
+ run_micro_httpd(hreq->hsrv);
+ }
}
void afb_hreq_reply(struct afb_hreq *hreq, unsigned status, struct MHD_Response *response, ...)
diff --git a/src/afb-hreq.h b/src/afb-hreq.h
index 76d1d430..a8df015e 100644
--- a/src/afb-hreq.h
+++ b/src/afb-hreq.h
@@ -20,11 +20,17 @@
struct AFB_clientCtx;
struct json_object;
struct hreq_data;
+struct afb_hsrv;
struct afb_hreq {
+ struct afb_hsrv *hsrv;
const char *cacheTimeout;
struct MHD_Connection *connection;
int method;
+ int reqid;
+ int scanned;
+ int suspended;
+ int replied;
const char *version;
const char *url;
size_t lenurl;
diff --git a/src/afb-hsrv.c b/src/afb-hsrv.c
index f200a960..a8338251 100644
--- a/src/afb-hsrv.c
+++ b/src/afb-hsrv.c
@@ -59,9 +59,11 @@ struct afb_hsrv {
struct hsrv_handler *handlers;
struct MHD_Daemon *httpd;
struct upoll *upoll;
+ int in_run;
char *cache_to;
};
+static int global_reqids = 0;
static void reply_error(struct MHD_Connection *connection, unsigned int status)
{
@@ -107,25 +109,34 @@ static int access_handler(
hsrv = cls;
hreq = *recordreq;
if (hreq == NULL) {
- /* create the request */
- hreq = calloc(1, sizeof *hreq);
- if (hreq == NULL)
- goto internal_error;
- *recordreq = hreq;
-
/* get the method */
method = get_method(methodstr);
method &= afb_method_get | afb_method_post;
- if (method == afb_method_none)
- goto bad_request;
+ if (method == afb_method_none) {
+ reply_error(connection, MHD_HTTP_BAD_REQUEST);
+ return MHD_YES;
+ }
+
+ /* create the request */
+ hreq = calloc(1, sizeof *hreq);
+ if (hreq == NULL) {
+ reply_error(connection, MHD_HTTP_INTERNAL_SERVER_ERROR);
+ return MHD_YES;
+ }
/* init the request */
+ hreq->hsrv = hsrv;
hreq->cacheTimeout = hsrv->cache_to;
+ hreq->reqid = ++global_reqids;
+ hreq->scanned = 0;
+ hreq->suspended = 0;
+ hreq->replied = 0;
hreq->connection = connection;
hreq->method = method;
hreq->version = version;
hreq->tail = hreq->url = url;
hreq->lentail = hreq->lenurl = strlen(url);
+ *recordreq = hreq;
/* init the post processing */
if (method == afb_method_post) {
@@ -136,10 +147,10 @@ static int access_handler(
} else if (strcasestr(type, FORM_CONTENT) != NULL) {
hreq->postform = MHD_create_post_processor (connection, 65500, postproc, hreq);
if (hreq->postform == NULL)
- goto internal_error;
+ afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR);
return MHD_YES;
} else if (strcasestr(type, JSON_CONTENT) == NULL) {
- reply_error(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE);
+ afb_hreq_reply_error(hreq, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE);
return MHD_YES;
}
}
@@ -148,30 +159,50 @@ static int access_handler(
/* process further data */
if (*upload_data_size) {
if (hreq->postform != NULL) {
- if (!MHD_post_process (hreq->postform, upload_data, *upload_data_size))
- goto internal_error;
+ if (!MHD_post_process (hreq->postform, upload_data, *upload_data_size)) {
+ afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR);
+ return MHD_YES;
+ }
} else {
- if (!afb_hreq_post_add(hreq, "", upload_data, *upload_data_size))
- goto internal_error;
+ if (!afb_hreq_post_add(hreq, "", upload_data, *upload_data_size)) {
+ afb_hreq_reply_error(hreq, MHD_HTTP_INTERNAL_SERVER_ERROR);
+ return MHD_YES;
+ }
}
*upload_data_size = 0;
- return MHD_YES;
+ return MHD_YES;
}
/* flush the data */
if (hreq->postform != NULL) {
rc = MHD_destroy_post_processor(hreq->postform);
hreq->postform = NULL;
- if (rc == MHD_NO)
- goto bad_request;
+ if (rc == MHD_NO) {
+ afb_hreq_reply_error(hreq, MHD_HTTP_BAD_REQUEST);
+ return MHD_YES;
+ }
+ }
+
+ if (hreq->scanned != 0) {
+ if (hreq->replied == 0 && hreq->suspended == 0) {
+ MHD_suspend_connection (connection);
+ hreq->suspended = 1;
+ }
+ return MHD_YES;
}
/* search an handler for the request */
+ hreq->scanned = 1;
iter = hsrv->handlers;
while (iter) {
if (afb_hreq_unprefix(hreq, iter->prefix, iter->length)) {
- if (iter->handler(hreq, iter->data))
+ if (iter->handler(hreq, iter->data)) {
+ if (hreq->replied == 0 && hreq->suspended == 0) {
+ MHD_suspend_connection (connection);
+ hreq->suspended = 1;
+ }
return MHD_YES;
+ }
hreq->tail = hreq->url;
hreq->lentail = hreq->lenurl;
}
@@ -181,14 +212,6 @@ static int access_handler(
/* no handler */
afb_hreq_reply_error(hreq, MHD_HTTP_NOT_FOUND);
return MHD_YES;
-
-bad_request:
- reply_error(connection, MHD_HTTP_BAD_REQUEST);
- return MHD_YES;
-
-internal_error:
- reply_error(connection, MHD_HTTP_INTERNAL_SERVER_ERROR);
- return MHD_YES;
}
/* Because of POST call multiple time requestApi we need to free POST handle here */
@@ -203,11 +226,19 @@ static void end_handler(void *cls, struct MHD_Connection *connection, void **rec
afb_hreq_free(hreq);
}
-static void handle_epoll_readable(struct afb_hsrv *hsrv)
+void run_micro_httpd(struct afb_hsrv *hsrv)
{
- upoll_on_readable(hsrv->upoll, NULL);
- MHD_run(hsrv->httpd);
- upoll_on_readable(hsrv->upoll, (void*)handle_epoll_readable);
+ if (hsrv->in_run != 0)
+ hsrv->in_run = 2;
+ else {
+ upoll_on_readable(hsrv->upoll, NULL);
+ do {
+ hsrv->in_run = 1;
+ MHD_run(hsrv->httpd);
+ } while(hsrv->in_run == 2);
+ hsrv->in_run = 0;
+ upoll_on_readable(hsrv->upoll, (void*)run_micro_httpd);
+ }
};
static int new_client_handler(void *cls, const struct sockaddr *addr, socklen_t addrlen)
@@ -360,7 +391,7 @@ int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection
fprintf(stderr, "Error: connection to upoll of httpd failed");
return 0;
}
- upoll_on_readable(upoll, (void*)handle_epoll_readable);
+ upoll_on_readable(upoll, (void*)run_micro_httpd);
hsrv->httpd = httpd;
hsrv->upoll = upoll;
diff --git a/src/afb-hsrv.h b/src/afb-hsrv.h
index b422df47..b328df15 100644
--- a/src/afb-hsrv.h
+++ b/src/afb-hsrv.h
@@ -24,7 +24,6 @@ struct afb_hreq;
extern struct afb_hsrv *afb_hsrv_create();
extern void afb_hsrv_put(struct afb_hsrv *hsrv);
-
extern void afb_hsrv_stop(struct afb_hsrv *hsrv);
extern int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection_timeout);
extern int afb_hsrv_set_cache_timeout(struct afb_hsrv *hsrv, int duration);