summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/harvester-apidef.json11
-rw-r--r--src/harvester.c48
-rw-r--r--src/plugins/influxdb-reader.c125
-rw-r--r--src/plugins/influxdb-writer.c116
-rw-r--r--src/plugins/influxdb.c110
-rw-r--r--src/plugins/influxdb.h27
-rw-r--r--src/plugins/tsdb.h13
8 files changed, 328 insertions, 124 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4217668..c83a11b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -25,6 +25,8 @@ PROJECT_TARGET_ADD(harvester)
${TARGET_NAME}.c
plugins/tsdb.c
plugins/influxdb.c
+ plugins/influxdb-writer.c
+ plugins/influxdb-reader.c
)
set(OPENAPI_DEF "harvester-apidef" CACHE STRING "name and path to the JSON API definition without extension")
diff --git a/src/harvester-apidef.json b/src/harvester-apidef.json
index 3ae3dee..ff51a38 100644
--- a/src/harvester-apidef.json
+++ b/src/harvester-apidef.json
@@ -130,7 +130,10 @@
"in": "query",
"name": "port",
"required": false,
- "schema": { "type": "integer" }
+ "schema": { "oneOf": [
+ {"type": "integer"},
+ {"type": "string"}
+ ]}
},
{
"in": "query",
@@ -158,6 +161,12 @@
},
{
"in": "query",
+ "name": "priority",
+ "required": false,
+ "schema": { "type": "string" }
+ },
+ {
+ "in": "query",
"name": "identity",
"required": false,
"schema": { "type": "string" }
diff --git a/src/harvester.c b/src/harvester.c
index 945f6e1..80bbf37 100644
--- a/src/harvester.c
+++ b/src/harvester.c
@@ -18,7 +18,6 @@
#define _GNU_SOURCE
#include "harvester.h"
#include "harvester-apidef.h"
-#include <pthread.h>
#include "plugins/tsdb.h"
#include "curl-wrap.h"
@@ -26,41 +25,43 @@
#define ERROR -1
-CURL* (*tsdb_write)(const char* host, int port, json_object *metric);
-CURL* (*tsdb_read)(const char* host, int port, json_object *metric);
-void (*curl_cb)(void *closure, int status, CURL *curl, const char *result, size_t size);
-pthread_mutex_t db_mutex;
+CURL* (*tsdb_write)(const char* host, const char *port, json_object *metric);
+void (*write_curl_cb)(void *closure, int status, CURL *curl, const char *result, size_t size);
-int do_write(struct afb_req req, const char* host, int port, json_object *metric)
+struct reader_args r_args = {NULL, NULL};
+
+int do_write(struct afb_req req, const char* host, const char *port, json_object *metric)
{
CURL *curl_request;
curl_request = tsdb_write(host, port, metric);
-
- pthread_mutex_lock(&db_mutex);
- curl_wrap_do(curl_request, curl_cb, &req);
- pthread_mutex_unlock(&db_mutex);
+ curl_wrap_do(curl_request, write_curl_cb, &req);
return 0;
}
void write(struct afb_req req)
{
- int port = -1;
+ const char *port = NULL;
const char *host = NULL;
json_object *req_args = afb_req_json(req),
+ *portJ = NULL,
*metric = NULL;
- if(wrap_json_unpack(req_args, "{s?s,s?i,so!}",
+ if(wrap_json_unpack(req_args, "{s?s,s?o,so!}",
"host", &host,
- "port", &port,
+ "port", &portJ,
"metric", &metric) || ! metric)
afb_req_fail(req, "Failed", "Error processing arguments. Miss metric\
- JSON object or malformed");
- else if(do_write(req, host, port, metric))
- afb_req_fail(req, "Failed", "Error processing metric JSON object.\
- Malformed !");
+JSON object or malformed");
+ else {
+ port = json_object_is_type(portJ, json_type_null) ?
+ NULL : json_object_to_json_string(portJ);
+ if(do_write(req, host, port, metric))
+ afb_req_fail(req, "Failed", "Error processing metric JSON object.\
+Malformed !");
+ }
}
void auth(struct afb_req request)
@@ -72,22 +73,21 @@ void auth(struct afb_req request)
int init()
{
int tsdb_available = 0;
+
if(curl_global_init(CURL_GLOBAL_DEFAULT) != 0) {
AFB_ERROR("Something went wrong initiliazing libcurl. Abort");
return ERROR;
}
- if(pthread_mutex_init(&db_mutex, NULL) != 0) {
- AFB_ERROR("Something went wrong initiliazing mutex. Abort");
- return ERROR;
- }
-
tsdb_available = db_ping();
switch (tsdb_available) {
case INFLUX:
tsdb_write = influxdb_write;
- tsdb_read = influxdb_read;
- curl_cb = influxdb_cb;
+ write_curl_cb = influxdb_write_curl_cb;
+ if(influxdb_reader(&r_args) != 0) {
+ AFB_ERROR("Problem initiating reader timer. Abort");
+ return ERROR;
+ }
break;
default:
AFB_ERROR("No Time Series Database found. Abort");
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
new file mode 100644
index 0000000..bb56c93
--- /dev/null
+++ b/src/plugins/influxdb-reader.c
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Author "Romain Forlot" <romain.forlot@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <systemd/sd-event.h>
+
+#include "influxdb.h"
+
+CURL *make_curl_query_get(const char *url)
+{
+ CURL *curl;
+ char *args[5];
+ char *last_ts[30];
+
+ int fd_last_read = afb_daemon_rootdir_open_locale("last_db_read", O_CREAT, NULL);
+ if (fd_last_read < 0)
+ return NULL;
+
+ read(fd_last_read, last_ts, sizeof(last_ts));
+
+ args[0] = "epoch";
+ args[1] = "ns";
+ args[2] = "q";
+ args[3] = "SELECT * FROM /^.*$/";
+ args[4] = NULL;
+
+ curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);
+
+ return curl;
+}
+
+int unpack_metric_from_db(json_object *metric)
+{
+ const char *name;
+ json_object *columns = NULL, *values = NULL;
+
+ wrap_json_unpack(metric, "{ss, so, so!}",
+ "name", &name,
+ "columns", &columns,
+ "values", &values);
+}
+
+int unpack_series(json_object *series)
+{
+ size_t length_series = json_object_array_length(series);
+
+}
+
+void forward_to_garner(const char *result, size_t size)
+{
+ int id = 0;
+
+ json_object *series = NULL,
+ *db_dump = json_tokener_parse(result);
+ wrap_json_unpack(db_dump, "{s[{si,so}]}",
+ "id", &id,
+ "series", &series);
+
+ unpack_series(series);
+
+}
+
+void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
+{
+ long rep_code = curl_wrap_response_code_get(curl);
+ switch(rep_code) {
+ case 204:
+ AFB_DEBUG("Read correctly done");
+ forward_to_garner(result, size);
+ break;
+ case 400:
+ AFB_ERROR("Unacceptable request. %s", result);
+ break;
+ case 401:
+ AFB_ERROR("Invalid authentication. %s", result);
+ break;
+ default:
+ AFB_ERROR("Unexptected behavior. %s", result);
+ break;
+ }
+}
+
+int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
+{
+ CURL *curl;
+ struct reader_args *a = NULL;
+ if(userdata)
+ a = (struct reader_args*)userdata;
+ else
+ return -1;
+
+ char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
+
+ make_url(url, sizeof(url), a->host, a->port, "query");
+ curl = make_curl_query_get(url);
+ curl_wrap_do(curl, influxdb_read_curl_cb, NULL);
+
+ return 0;
+}
+
+int influxdb_reader(void *args)
+{
+ uint64_t usec;
+ struct sd_event_source *evtSource;
+
+ /* Set a cyclic cb call each 1s to call the read callback */
+ sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
+ return sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+}
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
new file mode 100644
index 0000000..3c46b57
--- /dev/null
+++ b/src/plugins/influxdb-writer.c
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Author "Romain Forlot" <romain.forlot@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <string.h>
+#include "influxdb.h"
+
+size_t format_write_args(char *query, const char *name, const char *source, const char *unit, const char *identity, json_object *jv, uint64_t timestamp)
+{
+ char *ts;
+ memset(query, 0, URL_MAXIMUM_LENGTH);
+
+ strncat(query, name, strlen(name));
+ if (source) {
+ concatenate(query, source, ",");
+ }
+ if (unit) {
+ concatenate(query, unit, ",");
+ }
+ if (identity) {
+ concatenate(query, identity, ",");
+ }
+
+ concatenate(query, "value", " ");
+ concatenate(query, json_object_to_json_string(jv), "=");
+ asprintf(&ts, "%lu", timestamp);
+ concatenate(query, ts, " ");
+
+ return strlen(query);
+}
+
+CURL *make_curl_write_post(const char *url, json_object *metric)
+{
+ CURL *curl;
+ json_object *jv = NULL;
+ uint64_t timestamp = 0;
+ const char *name = NULL,
+ *source = NULL,
+ *unit = NULL,
+ *identity = NULL;
+
+ size_t lpd = json_object_is_type(metric, json_type_array) ?
+ json_object_array_length(metric) + 1 : 2;
+
+ char **post_data;
+ post_data = malloc(lpd);
+
+ char write[URL_MAXIMUM_LENGTH];
+ bzero(write, URL_MAXIMUM_LENGTH);
+
+ if(unpack_metric_from_binding(metric, &name, &source, &unit, &identity, &jv, &timestamp)) {
+ AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metric));
+ curl = NULL;
+ }
+ else {
+ for(long int i = --lpd; i >= 0; i--) {
+ format_write_args(write, name, source, unit, identity, jv, timestamp);
+ post_data[i] = i == lpd ? NULL : write;
+ }
+ curl = curl_wrap_prepare_post(url, NULL, 1, (const char * const*)post_data);
+ }
+ free(post_data);
+
+ return curl;
+}
+
+void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
+{
+ struct afb_req *req = (struct afb_req*)closure;
+ long rep_code = curl_wrap_response_code_get(curl);
+ switch(rep_code) {
+ case 204:
+ AFB_DEBUG("Request correctly written");
+ afb_req_success(*req, NULL, "Request has been successfully writen");
+ break;
+ case 400:
+ afb_req_fail(*req, "Bad request", result);
+ break;
+ case 401:
+ afb_req_fail(*req, "Unauthorized access", result);
+ break;
+ case 404:
+ afb_req_fail(*req, "Not found", result);
+ AFB_NOTICE("Attempt to create the DB '"DEFAULT_DB"'");
+ create_database();
+ break;
+ case 500:
+ afb_req_fail_f(*req, "Timeout", "Overloaded server: %s", result);
+ break;
+ default:
+ afb_req_fail(*req, "Failure", "Unexpected behavior.");
+ break;
+ }
+}
+
+CURL *influxdb_write(const char* host, const char *port, json_object *metric)
+{
+ char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
+ make_url(url, sizeof(url), host, port, "write");
+ return make_curl_write_post(url, metric);
+}
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index 17c140d..8a8a112 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -16,8 +16,10 @@
*/
#define _GNU_SOURCE
-#include <string.h>
+#include <fcntl.h>
#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
#include "tsdb.h"
#include "wrap-json.h"
@@ -49,36 +51,7 @@ int create_database()
return ret;
}
-void influxdb_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
-{
- struct afb_req *req = (struct afb_req*)closure;
- long rep_code = curl_wrap_response_code_get(curl);
- switch(rep_code) {
- case 204:
- AFB_DEBUG("Request correctly written");
- afb_req_success(*req, NULL, "Request has been successfully writen");
- break;
- case 400:
- afb_req_fail(*req, "Bad request", result);
- break;
- case 401:
- afb_req_fail(*req, "Unauthorized access", result);
- break;
- case 404:
- afb_req_fail(*req, "Not found", result);
- AFB_NOTICE("Attempt to create the DB '"DEFAULT_DB"'");
- create_database();
- break;
- case 500:
- afb_req_fail_f(*req, "Timeout", "Overloaded server: %s", result);
- break;
- default:
- afb_req_fail(*req, "Failure", "Unexpected behavior.");
- break;
- }
-}
-
-int unpack_metric(json_object *m, const char **name, const char **source, const char **unit, const char **identity, json_object **jv, uint64_t *timestamp)
+int unpack_metric_from_binding(json_object *m, const char **name, const char **source, const char **unit, const char **identity, json_object **jv, uint64_t *timestamp)
{
if (wrap_json_unpack(m, "{ss,s?s,s?s,s?s,so,sI!}",
"name", name,
@@ -103,75 +76,20 @@ void concatenate(char* dest, const char* source, const char *sep)
strncat(dest, source, strlen(source));
}
-char *format_write_query(char *query, const char *name, const char *source, const char *unit, const char *identity, json_object *jv, uint64_t timestamp)
+size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint)
{
- char *ts;
- memset(query, 0, URL_MAXIMUM_LENGTH);
-
- strncat(query, name, strlen(name));
- if (source) {
- concatenate(query, source, ",");
- }
- if (unit) {
- concatenate(query, unit, ",");
- }
- if (identity) {
- concatenate(query, identity, ",");
- }
-
- concatenate(query, "value", " ");
- concatenate(query, json_object_to_json_string(jv), "=");
- asprintf(&ts, "%lu", timestamp);
- concatenate(query, ts, " ");
-
- return query;
-}
-
-CURL *make_curl_write_post(const char *url, struct json_object *metric)
-{
- CURL *curl;
- const char *name = NULL,
- *source = NULL,
- *unit = NULL,
- *identity = NULL;
-
- size_t lpd = json_object_is_type(metric, json_type_array) ?
- json_object_array_length(metric) + 1 : 2;
-
- char **post_data;
- post_data = malloc(lpd);
-
- char query[URL_MAXIMUM_LENGTH];
- bzero(query, URL_MAXIMUM_LENGTH);
-
- json_object *jv = NULL;
- uint64_t timestamp = 0;
-
- if(unpack_metric(metric, &name, &source, &unit, &identity, &jv, &timestamp)) {
- AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metric));
- curl = NULL;
- }
- else {
- for(long unsigned int i = lpd; i != 0; i--) {
- format_write_query(query, name, source, unit, identity, jv, timestamp);
- post_data[i] = i == lpd ? NULL : query;
- }
- curl = curl_wrap_prepare_post(url, NULL, 1, (const char * const*)post_data);
- }
-
- return curl;
-}
-
-CURL *influxdb_write(const char* host, int port, json_object *metric)
-{
- char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
- memset(url, 0, sizeof(url));
+ bzero(url, l_url);
/* Handle default host and port */
host = host ? host : DEFAULT_DBHOST;
- port = port > 0 ? port : atoi(DEFAULT_DBPORT);
+ port = port ? port : DEFAULT_DBPORT;
strncat(url, host, strlen(host));
- strncat(url, ":"DEFAULT_DBPORT"/write?db="DEFAULT_DB, strlen(":"DEFAULT_DBPORT"/write?db="DEFAULT_DB));
- return make_curl_write_post(url, metric);
+ strncat(url, ":", 1);
+ strncat(url, port, strlen(port));
+ strncat(url, "/", 1);
+ strncat(url, endpoint, strlen(endpoint));
+ strncat(url, "?db="DEFAULT_DB, strlen("?db="DEFAULT_DB));
+
+ return strlen(url);
}
diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h
new file mode 100644
index 0000000..6c775b7
--- /dev/null
+++ b/src/plugins/influxdb.h
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Author "Romain Forlot" <romain.forlot@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "wrap-json.h"
+#include "tsdb.h"
+
+int create_database();
+
+int unpack_metric_from_binding(json_object *m, const char **name, const char **source, const char **unit, const char **identity, json_object **jv, uint64_t *timestamp);
+
+void concatenate(char* dest, const char* source, const char *sep);
+
+size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint);
diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h
index 527f538..d7d6c2f 100644
--- a/src/plugins/tsdb.h
+++ b/src/plugins/tsdb.h
@@ -32,7 +32,14 @@ enum db_available {
OPENTSDB = 4
};
-CURL *influxdb_write(const char* host, int port, json_object *metric);
-CURL *influxdb_read(const char* host, int port, json_object *query);
-void influxdb_cb(void *closure, int status, CURL *curl, const char *result, size_t size);
+struct reader_args {
+ const char *host;
+ const char *port;
+};
+
+CURL *influxdb_write(const char* host, const char *port, json_object *metric);
+void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size);
+
+int influxdb_reader(void *args);
+
int db_ping();