summaryrefslogtreecommitdiffstats
path: root/src/harvester.c
blob: 945f6e15dd2e559da7badcf8e97d90fea59bdb67 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/*
 * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
 * Author "Romain Forlot" <romain.forlot@iot.bzh>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *	 http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define _GNU_SOURCE
#include "harvester.h"
#include "harvester-apidef.h"
#include <pthread.h>

#include "plugins/tsdb.h"
#include "curl-wrap.h"
#include "wrap-json.h"

#define ERROR -1

CURL* (*tsdb_write)(const char* host, int port, json_object *metric);
CURL* (*tsdb_read)(const char* host, int port, json_object *metric);
void (*curl_cb)(void *closure, int status, CURL *curl, const char *result, size_t size);
pthread_mutex_t db_mutex;

int do_write(struct afb_req req, const char* host, int port, json_object *metric)
{
	CURL *curl_request;

	curl_request = tsdb_write(host, port, metric);

	pthread_mutex_lock(&db_mutex);
	curl_wrap_do(curl_request, curl_cb, &req);
	pthread_mutex_unlock(&db_mutex);

	return 0;
}

void write(struct afb_req req)
{
	int port = -1;
	const char *host = NULL;

	json_object *req_args = afb_req_json(req),
				*metric = NULL;

	if(wrap_json_unpack(req_args, "{s?s,s?i,so!}",
								"host", &host,
								"port", &port,
								"metric", &metric) || ! metric)
		afb_req_fail(req, "Failed", "Error processing arguments. Miss metric\
		JSON object or malformed");
	else if(do_write(req, host, port, metric))
		afb_req_fail(req, "Failed", "Error processing metric JSON object.\
		Malformed !");
}

void auth(struct afb_req request)
{
	afb_req_session_set_LOA(request, 1);
	afb_req_success(request, NULL, NULL);
}

int init()
{
	int tsdb_available = 0;
	if(curl_global_init(CURL_GLOBAL_DEFAULT) != 0) {
		AFB_ERROR("Something went wrong initiliazing libcurl. Abort");
		return ERROR;
	}

	if(pthread_mutex_init(&db_mutex, NULL) != 0) {
		AFB_ERROR("Something went wrong initiliazing mutex. Abort");
		return ERROR;
	}

	tsdb_available = db_ping();
	switch (tsdb_available) {
		case INFLUX:
			tsdb_write = influxdb_write;
			tsdb_read = influxdb_read;
			curl_cb = influxdb_cb;
			break;
		default:
			AFB_ERROR("No Time Series Database found. Abort");
			return ERROR;
			break;
	}

	return 0;
}
x/uaccess.h> #include <linux/usb.h> #include <linux/mutex.h> #include <linux/spinlock.h> /* Define these values to match your devices */ #define USB_MOCCA_VENDOR_ID 0x184f #define USB_MOCCA_PRODUCT_ID_1 0x0007 #define USB_MOCCA_PRODUCT_ID_2 0x0008 #define USB_MOCCA_PRODUCT_ID_3 0x0009 #define USB_MOCCA_PRODUCT_ID_4 0x0010 #define USB_MOCCA_PRODUCT_ID_5 0x0011 #define USB_MOCCA_PRODUCT_ID_6 0x0012 /* table of devices that work with this driver */ static const struct usb_device_id mocca_table[] = { { USB_DEVICE(USB_MOCCA_VENDOR_ID, USB_MOCCA_PRODUCT_ID_1) }, { USB_DEVICE(USB_MOCCA_VENDOR_ID, USB_MOCCA_PRODUCT_ID_2) }, { USB_DEVICE(USB_MOCCA_VENDOR_ID, USB_MOCCA_PRODUCT_ID_3) }, { USB_DEVICE(USB_MOCCA_VENDOR_ID, USB_MOCCA_PRODUCT_ID_4) }, { USB_DEVICE(USB_MOCCA_VENDOR_ID, USB_MOCCA_PRODUCT_ID_5) }, { USB_DEVICE(USB_MOCCA_VENDOR_ID, USB_MOCCA_PRODUCT_ID_6) }, { } /* Terminating entry */ }; MODULE_DEVICE_TABLE (usb, mocca_table); /* Get a minor range for your devices from the usb maintainer */ #define USB_MOCCA_MINOR_BASE 221 /*TODO: change this minor number */ /* our private defines. if this grows any larger, use your own .h file */ #define MAX_TRANSFER ( PAGE_SIZE - 512 ) #define WRITES_IN_FLIGHT 8 /* Structure to hold all of our device specific stuff */ struct mocca_skel { struct usb_device * udev; /* the usb device for this device */ struct usb_interface * interface; /* the interface for this device */ struct semaphore limit_sem; /* limiting the number of writes in progress */ unsigned char * bulk_in_buffer; /* the buffer to receive data */ size_t bulk_in_size; /* the size of the receive buffer */ __u8 bulk_in_endpointAddr; /* the address of the bulk in endpoint */ __u8 bulk_out_endpointAddr; /* the address of the bulk out endpoint */ struct kref kref; }; #define to_mocca_dev(d) container_of(d, struct mocca_skel, kref) static struct usb_driver mocca_driver; static spinlock_t spinLock; static void mocca_delete(struct kref *kref) { struct mocca_skel *dev = to_mocca_dev(kref); usb_put_dev(dev->udev); kfree (dev->bulk_in_buffer); kfree (dev); } static int mocca_open(struct inode *inode, struct file *file) { struct mocca_skel *dev; struct usb_interface *interface; int subminor; int retval = 0; subminor = iminor(inode); interface = usb_find_interface(&mocca_driver, subminor); if (!interface) { printk(KERN_ERR "%s - error, can't find device for minor %d", __FUNCTION__, subminor); retval = -ENODEV; goto exit; } dev = usb_get_intfdata(interface); if (!dev) { retval = -ENODEV; goto exit; } /* increment our usage count for the device */ kref_get(&dev->kref); /* save our object in the file's private structure */ file->private_data = dev; exit: return retval; } static int mocca_release(struct inode *inode, struct file *file) { struct mocca_skel *dev; dev = (struct mocca_skel *)file->private_data; if (dev == NULL) return -ENODEV; /* decrement the count on our device */ kref_put(&dev->kref, mocca_delete); return 0; } static ssize_t mocca_read(struct file *file, char *buffer, size_t count, loff_t *ppos) { struct mocca_skel *dev; int retval = 0; int bytes_read; dev = (struct mocca_skel *)file->private_data; /* do a blocking bulk read to get data from the device */ retval = usb_bulk_msg(dev->udev, usb_rcvbulkpipe(dev->udev, dev->bulk_in_endpointAddr), dev->bulk_in_buffer, min(dev->bulk_in_size, count), &bytes_read, 10000); /* if the read was successful, copy the data to userspace */ if (!retval) { if (copy_to_user(buffer, dev->bulk_in_buffer, bytes_read)) retval = -EFAULT; else retval = bytes_read; } return retval; } static void mocca_write_bulk_callback(struct urb *urb) { struct mocca_skel *dev; dev = (struct mocca_skel *)urb->context; /* sync/async unlink faults aren't errors */ if (urb->status && !(urb->status == -ENOENT || urb->status == -ECONNRESET || urb->status == -ESHUTDOWN)) { printk(KERN_ERR "%s - nonzero write bulk status received: %d", __FUNCTION__, urb->status); } /* free up our allocated buffer */ usb_free_coherent(urb->dev, urb->transfer_buffer_length, urb->transfer_buffer, urb->transfer_dma); up(&dev->limit_sem); } static ssize_t mocca_write(struct file *file, const char *user_buffer, size_t count, loff_t *ppos) { struct mocca_skel *dev; int retval = 0; struct urb *urb = NULL; char *buf = NULL; size_t writesize = min(count, (size_t)MAX_TRANSFER); dev = (struct mocca_skel *)file->private_data; /* verify that we actually have some data to write */ if (count == 0) goto exit; /* limit the number of URBs in flight to stop a user from using up all RAM */ if (down_interruptible(&dev->limit_sem)) { retval = -ERESTARTSYS; goto exit; } /* create a urb, and a buffer for it, and copy the data to the urb */ urb = usb_alloc_urb(0, GFP_KERNEL); if (!urb) { retval = -ENOMEM; goto error; } buf = usb_alloc_coherent(dev->udev, writesize, GFP_KERNEL, &urb->transfer_dma); if (!buf) { retval = -ENOMEM; goto error; } if (copy_from_user(buf, user_buffer, writesize)) { retval = -EFAULT; goto error; } /* initialize the urb properly */ usb_fill_bulk_urb(urb, dev->udev, usb_sndbulkpipe(dev->udev, dev->bulk_out_endpointAddr), buf, writesize, mocca_write_bulk_callback, dev); urb->transfer_flags |= URB_NO_TRANSFER_DMA_MAP; /* send the data out the bulk port */ retval = usb_submit_urb(urb, GFP_KERNEL); if (retval) { printk(KERN_ERR "%s - failed submitting write urb, error %d", __FUNCTION__, retval); goto error; } /* release our reference to this urb, the USB core will eventually free it entirely */ usb_free_urb(urb); exit: return writesize; error: usb_free_coherent(dev->udev, writesize, buf, urb->transfer_dma); usb_free_urb(urb); up(&dev->limit_sem); return retval; } static struct file_operations mocca_fops = { .owner = THIS_MODULE, .read = mocca_read, .write = mocca_write, .open = mocca_open, .release = mocca_release, }; /* * usb class driver info in order to get a minor number from the usb core, * and to have the device registered with the driver core */ static struct usb_class_driver mocca_class = { .name = "mocca%d", .fops = &mocca_fops, .minor_base = USB_MOCCA_MINOR_BASE, }; static int mocca_probe(struct usb_interface *interface, const struct usb_device_id *id) { struct mocca_skel *dev = NULL; struct usb_host_interface *iface_desc; struct usb_endpoint_descriptor *endpoint; size_t buffer_size; int i; int retval = -ENOMEM; spin_lock_init(&spinLock); /* allocate memory for our device state and initialize it */ dev = kzalloc(sizeof(*dev), GFP_KERNEL); if (dev == NULL) { printk(KERN_ERR "Out of memory"); goto error; } kref_init(&dev->kref); sema_init(&dev->limit_sem, WRITES_IN_FLIGHT); dev->udev = usb_get_dev(interface_to_usbdev(interface)); dev->interface = interface; /* set up the endpoint information */ /* use only the first bulk-in and bulk-out endpoints */ iface_desc = interface->cur_altsetting; for (i = 0; i < iface_desc->desc.bNumEndpoints; ++i) { endpoint = &iface_desc->endpoint[i].desc; if (!dev->bulk_in_endpointAddr && ((endpoint->bEndpointAddress & USB_ENDPOINT_DIR_MASK) == USB_DIR_IN) && ((endpoint->bmAttributes & USB_ENDPOINT_XFERTYPE_MASK) == USB_ENDPOINT_XFER_BULK)) { /* we found a bulk in endpoint */ buffer_size = le16_to_cpu(endpoint->wMaxPacketSize); dev->bulk_in_size = buffer_size; dev->bulk_in_endpointAddr = endpoint->bEndpointAddress; dev->bulk_in_buffer = kmalloc(buffer_size, GFP_KERNEL); if (!dev->bulk_in_buffer) { printk(KERN_ERR "Could not allocate bulk_in_buffer"); goto error; } printk(KERN_INFO "Found bulk-in pipe, adress: %x, max packet size: %d", endpoint->bEndpointAddress, endpoint->wMaxPacketSize); } if (!dev->bulk_out_endpointAddr && ((endpoint->bEndpointAddress & USB_ENDPOINT_DIR_MASK) == USB_DIR_OUT) && ((endpoint->bmAttributes & USB_ENDPOINT_XFERTYPE_MASK) == USB_ENDPOINT_XFER_BULK)) { /* we found a bulk out endpoint */ dev->bulk_out_endpointAddr = endpoint->bEndpointAddress; printk(KERN_INFO "Found bulk-out pipe, adress: %x, max packet size: %d", endpoint->bEndpointAddress, endpoint->wMaxPacketSize); } } if (!(dev->bulk_in_endpointAddr && dev->bulk_out_endpointAddr)) { printk(KERN_ERR "Could not find both bulk-in and bulk-out endpoints"); goto error; } /* save our data pointer in this interface device */ usb_set_intfdata(interface, dev); /* we can register the device now, as it is ready */ retval = usb_register_dev(interface, &mocca_class); if (retval) { /* something prevented us from registering this driver */ printk(KERN_ERR "Not able to get a minor for this device."); usb_set_intfdata(interface, NULL); goto error; } /* let the user know what node this device is now attached to */ dev_info(&interface->dev, "K2L MOCCA device now attached to mocca%d", interface->minor); return 0; error: if (dev) kref_put(&dev->kref, mocca_delete); return retval; } static void mocca_disconnect(struct usb_interface *interface) { struct mocca_skel *dev; int minor = interface->minor; /* prevent mocca_open() from racing mocca_disconnect() */ spin_lock(&spinLock); dev = usb_get_intfdata(interface); usb_set_intfdata(interface, NULL); /* give back our minor */ usb_deregister_dev(interface, &mocca_class); spin_unlock(&spinLock); /* decrement our usage count */ kref_put(&dev->kref, mocca_delete); dev_info(&interface->dev, "K2L MOCCA device #%d now disconnected", minor); } static struct usb_driver mocca_driver = { .name = "k2l-mocca", .probe = mocca_probe, .disconnect = mocca_disconnect, .id_table = mocca_table, }; static int __init usb_mocca_init(void) { int result; /* register this driver with the USB subsystem */ result = usb_register(&mocca_driver); if (result) printk(KERN_ERR "usb_register failed. Error number %d", result); return result; } static void __exit usb_mocca_exit(void) { /* deregister this driver with the USB subsystem */ usb_deregister(&mocca_driver); } module_init (usb_mocca_init); module_exit (usb_mocca_exit); MODULE_LICENSE("GPL");