aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/influxdb-reader.c
blob: bb56c936c177bb4f521904f5ad4a5c31f5f018a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
 * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
 * Author "Romain Forlot" <romain.forlot@iot.bzh>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *	 http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <systemd/sd-event.h>

#include "influxdb.h"

CURL *make_curl_query_get(const char *url)
{
	CURL *curl;
	char *args[5];
	char *last_ts[30];

	int fd_last_read = afb_daemon_rootdir_open_locale("last_db_read", O_CREAT, NULL);
	if (fd_last_read < 0)
		return NULL;

	read(fd_last_read, last_ts, sizeof(last_ts));

	args[0] = "epoch";
	args[1] = "ns";
	args[2] = "q";
	args[3] = "SELECT * FROM /^.*$/";
	args[4] = NULL;

	curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);

	return curl;
}

int unpack_metric_from_db(json_object *metric)
{
	const char *name;
	json_object *columns = NULL, *values = NULL;

	wrap_json_unpack(metric, "{ss, so, so!}",
					 "name", &name,
					 "columns", &columns,
					 "values", &values);
}

int unpack_series(json_object *series)
{
	size_t length_series = json_object_array_length(series);

}

void forward_to_garner(const char *result, size_t size)
{
	int id = 0;

	json_object *series = NULL,
				*db_dump = json_tokener_parse(result);
	wrap_json_unpack(db_dump, "{s[{si,so}]}",
					 "id", &id,
					 "series", &series);

	unpack_series(series);

}

void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
	long rep_code = curl_wrap_response_code_get(curl);
	switch(rep_code) {
		case 204:
			AFB_DEBUG("Read correctly done");
			forward_to_garner(result, size);
			break;
		case 400:
			AFB_ERROR("Unacceptable request. %s", result);
			break;
		case 401:
			AFB_ERROR("Invalid authentication. %s", result);
			break;
		default:
			AFB_ERROR("Unexptected behavior. %s", result);
			break;
	}
}

int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
	CURL *curl;
	struct reader_args *a = NULL;
	if(userdata)
		a = (struct reader_args*)userdata;
	else
		return -1;

	char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */

	make_url(url, sizeof(url), a->host, a->port, "query");
	curl = make_curl_query_get(url);
	curl_wrap_do(curl, influxdb_read_curl_cb, NULL);

	return 0;
}

int influxdb_reader(void *args)
{
	uint64_t usec;
	struct sd_event_source *evtSource;

	/* Set a cyclic cb call each 1s to call the read callback */
	sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
	return sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
}