aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/low-can.cpp
blob: 18138b5f2208d158faab1ce5f2ffc9eb1ddfd30e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
/*
 * Copyright (C) 2016 "IoT.bzh"
 * Author Romain Forlot <romain.forlot@iot.bzh>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
*/

#define AFB_BINDING_VERSION 2
#include <afb/afb-binding.h>
#include <systemd/sd-event.h>
#include <json-c/json_object.h>
#include <stdbool.h>
#include <string.h>

#include "ctl-plugin.h"
#include "wrap-json.h"

#include "signal-composer.hpp"

extern "C"
{

CTLP_REGISTER("low-can");

typedef struct {
	bool door;
	bool window;
} doorT;

typedef struct {
	doorT front_left;
	doorT front_right;
	doorT rear_left;
	doorT rear_right;
} allDoorsCtxT;

typedef struct {
	struct signalCBT* pluginHandle;
	json_object *subscriptionBatch;
	allDoorsCtxT allDoorsCtx;
} lowCANCtxT;

void setDoor(doorT* aDoor, const char* eventName, int eventStatus)
{
	if(strcasestr(eventName, "door")) {aDoor->door = eventStatus;}
	else if(strcasestr(eventName, "window")) {aDoor->window = eventStatus;}
	else {AFB_WARNING("Unexpected behavior, this '%s' is not a door ! ", eventName);}
}

// Call at initialisation time
CTLP_ONLOAD(plugin, composerHandle)
{
	lowCANCtxT *pluginCtx= (lowCANCtxT*)calloc (1, sizeof(lowCANCtxT));

	pluginCtx->pluginHandle = (struct signalCBT*)composerHandle;
	pluginCtx->subscriptionBatch = json_object_new_array();

	AFB_NOTICE ("Low-can plugin: label='%s' version='%s' info='%s'",
		plugin->label,
		plugin->version,
		plugin->info);

	return (void*)pluginCtx;
}

CTLP_CAPI (subscribeToLow, source, argsJ, eventJ, context) {
	lowCANCtxT *pluginCtx = (lowCANCtxT*)source->context;
	json_object* dependsArrayJ = nullptr, *subscribeArgsJ = nullptr, *subscribeFilterJ = nullptr, *responseJ = nullptr, *filterJ = nullptr;
	const char *id = nullptr, *event = nullptr, *unit = nullptr;
	double frequency = 0;
	int err = 0;

	if(eventJ)
	{
		err = wrap_json_unpack(eventJ, "{ss,s?s,s?o,s?s,s?F,s?o !}",
			"id", &id,
			"event", &event,
			"depends", &dependsArrayJ,
			"unit", &unit,
			"frequency", &frequency,
			"getSignalsArgs", &filterJ);
		if(err)
		{
			AFB_ERROR("Problem to unpack JSON object eventJ: %s",
			json_object_to_json_string(eventJ));
			return err;
		}

		if(frequency > 0 && !filterJ)
			{wrap_json_pack(&subscribeFilterJ, "{sf}", "frequency", frequency);}
		else
			{subscribeFilterJ = filterJ;}

		std::string eventStr = std::string(event);
		std::string lowEvent = eventStr.substr(eventStr.find("/")+1);
		err = wrap_json_pack(&subscribeArgsJ, "{ss, so*}",
		"event", lowEvent.c_str(),
		"filter", subscribeFilterJ);
		if(err)
		{
			AFB_ERROR("Error building subscription query object");
			return err;
		}
		json_object_array_add(pluginCtx->subscriptionBatch, subscribeArgsJ);
	}
	else
	{
		AFB_DEBUG("Calling subscribe with %s", json_object_to_json_string_ext(pluginCtx->subscriptionBatch, JSON_C_TO_STRING_PRETTY));
		err = afb_service_call_sync("low-can", "subscribe", pluginCtx->subscriptionBatch, &responseJ);
		if(err)
			{AFB_ERROR("Subscribe to '%s' responseJ:%s", json_object_to_json_string_ext(pluginCtx->subscriptionBatch, JSON_C_TO_STRING_PRETTY), json_object_to_json_string(responseJ));}
	}

	return err;
}

CTLP_CAPI (isOpen, source, argsJ, eventJ, context) {
	const char *eventName = nullptr;
	int eventStatus;
	uint64_t timestamp;
	lowCANCtxT *pluginCtx=(lowCANCtxT*)source->context;

	int err = wrap_json_unpack(eventJ, "{ss,sb,s?F}",
		"name", &eventName,
		"value", &eventStatus,
		"timestamp", &timestamp);
	if(err)
	{
		AFB_ERROR("Error parsing event %s", json_object_to_json_string(eventJ));
		return -1;
	}

	struct signalValue value = {
		.hasBool = true, .boolVal = eventStatus,
		.hasNum = false, .numVal = 0,
		.hasStr = false, .strVal = std::string()
	};
	if(strcasestr(eventName, "front_left"))
	{
		pluginCtx->pluginHandle->setSignalValue(eventName,(uint64_t)timestamp, value);
		setDoor(&pluginCtx->allDoorsCtx.front_left, eventName, eventStatus);
	}
	else if(strcasestr(eventName, "front_right"))
	{
		pluginCtx->pluginHandle->setSignalValue(eventName,(uint64_t)timestamp, value);
		setDoor(&pluginCtx->allDoorsCtx.front_right, eventName, eventStatus);
	}
	else if(strcasestr(eventName, "rear_left"))
	{
		pluginCtx->pluginHandle->setSignalValue(eventName,(uint64_t)timestamp, value);
		setDoor(&pluginCtx->allDoorsCtx.rear_left, eventName, eventStatus);
	}
	else if(strcasestr(eventName, "rear_right"))
	{
		pluginCtx->pluginHandle->setSignalValue(eventName,(uint64_t)timestamp, value);
		setDoor(&pluginCtx->allDoorsCtx.rear_right, eventName, eventStatus);
	}
	else
	{
		AFB_WARNING("Unexpected behavior, this '%s' is it really a door ! ", json_object_to_json_string(eventJ));
		return -1;
	}

	AFB_DEBUG("This is the situation: source:%s, args:%s, event:%s,\n fld: %s, flw: %s, frd: %s, frw: %s, rld: %s, rlw: %s, rrd: %s, rrw: %s",
	source->label,
	json_object_to_json_string(argsJ),
	json_object_to_json_string(eventJ),
	pluginCtx->allDoorsCtx.front_left.door ? "true":"false",
	pluginCtx->allDoorsCtx.front_left.window ? "true":"false",
	pluginCtx->allDoorsCtx.front_right.door ? "true":"false",
	pluginCtx->allDoorsCtx.front_right.window ? "true":"false",
	pluginCtx->allDoorsCtx.rear_left.door ? "true":"false",
	pluginCtx->allDoorsCtx.rear_left.window ? "true":"false",
	pluginCtx->allDoorsCtx.rear_right.door ? "true":"false",
	pluginCtx->allDoorsCtx.rear_right.window ? "true":"false"
);

	return 0;
}

}
s="o">* userData) { char string[32]; int error; snd_pcm_sframes_t framesIn, framesOut, availIn, availOut; AlsaPcmCopyHandleT *pcmCopyHandle = (AlsaPcmCopyHandleT*) userData; // PCM has was closed if ((revents & EPOLLHUP) != 0) { AFB_ApiNotice(pcmCopyHandle->api, "AlsaPcmReadCB PCM=%s hanghup/disconnected", ALSA_PCM_UID(pcmCopyHandle->pcmIn, string)); goto ExitOnSuccess; } // ignore any non input events if ((revents & EPOLLIN) == 0) { goto ExitOnSuccess; } // retrieve PCM state snd_pcm_state_t pcmState = snd_pcm_state(pcmCopyHandle->pcmIn); // When pause flush remaining frame and wait if (pcmState == SND_PCM_STATE_PAUSED) { framesIn = snd_pcm_readi(pcmCopyHandle->pcmIn, pcmCopyHandle->buffer, pcmCopyHandle->frameCount); AFB_ApiInfo(pcmCopyHandle->api, "AlsaPcmReadCB: paused frame:%ld ignored", framesIn); goto ExitOnSuccess; } // When XRNS append try to restore PCM if (pcmState == SND_PCM_STATE_XRUN) { AFB_ApiNotice(pcmCopyHandle->api, "AlsaPcmReadCB PCM=%s XRUN", ALSA_PCM_UID(pcmCopyHandle->pcmIn, string)); snd_pcm_prepare(pcmCopyHandle->pcmIn); } // when PCM suspending loop until ready to go if (pcmState == SND_PCM_STATE_SUSPENDED) { while (1) { if ((error = snd_pcm_resume(pcmCopyHandle->pcmIn)) < 0) { AFB_ApiNotice(pcmCopyHandle->api, "AlsaPcmReadCB PCM=%s SUSPENDED fail to resume", ALSA_PCM_UID(pcmCopyHandle->pcmIn, string)); sleep(1); // Fulup should be replace with corresponding AFB_timer } else { AFB_ApiNotice(pcmCopyHandle->api, "AlsaPcmReadCB PCM=%s SUSPENDED success to resume", ALSA_PCM_UID(pcmCopyHandle->pcmIn, string)); } } } // do we have waiting frame availIn = snd_pcm_avail_update(pcmCopyHandle->pcmIn); if (availIn <= 0) { goto ExitOnSuccess; } // do we have space to push frame availOut = snd_pcm_avail_update(pcmCopyHandle->pcmOut); if (availOut <= 0) { snd_pcm_prepare(pcmCopyHandle->pcmOut); goto ExitOnSuccess; } // make sure we can push all input frame into output pcm without locking if (availOut < availIn) availIn = availOut; // we get too many data ignore some if (availIn > pcmCopyHandle->frameCount) { availIn = pcmCopyHandle->frameCount; } // effectively read pcmIn and push frame to pcmOut framesIn = snd_pcm_readi(pcmCopyHandle->pcmIn, pcmCopyHandle->buffer, availIn); if (framesIn < 0 || framesIn != availIn) { AFB_ApiNotice(pcmCopyHandle->api, "AlsaPcmReadCB PcmIn=%s UNDERUN framesIn=%ld, availIn %ld, max %d", ALSA_PCM_UID(pcmCopyHandle->pcmIn, string), framesIn, availIn, pcmCopyHandle->frameCount); snd_pcm_prepare(pcmCopyHandle->pcmIn); goto ExitOnSuccess; } // In/Out frames transfer through buffer copy framesOut = snd_pcm_writei(pcmCopyHandle->pcmOut, pcmCopyHandle->buffer, framesIn); //framesOut = snd_pcm_mmap_writei (pcmCopyHandle->pcmOut, pcmCopyHandle->buffer, framesIn); if (framesOut < 0 || framesOut != framesIn) { AFB_ApiNotice(pcmCopyHandle->api, "AlsaPcmReadCB PcmOut=%s UNDERUN frame=%ld / %ld", ALSA_PCM_UID(pcmCopyHandle->pcmOut, string), framesOut ,framesIn); snd_pcm_prepare(pcmCopyHandle->pcmOut); goto ExitOnSuccess; } if (framesIn != framesOut) { AFB_ApiNotice(pcmCopyHandle->api, "AlsaPcmReadCB PCM=%s Loosing frames=%ld", ALSA_PCM_UID(pcmCopyHandle->pcmOut, string), (framesIn - framesOut)); goto ExitOnSuccess; } return 0; // Cannot handle error in callback ExitOnSuccess: return 0; } static void *LoopInThread(void *handle) { AlsaPcmCopyHandleT *pcmCopyHandle = (AlsaPcmCopyHandleT*) handle; int count = 0; int watchdog = MAINLOOP_WATCHDOG * 1000; pcmCopyHandle->tid = (int) syscall(SYS_gettid); AFB_ApiNotice(pcmCopyHandle->api, "LoopInThread:%s/%d Started", pcmCopyHandle->info, pcmCopyHandle->tid); /* loop until end */ for (;;) { int res = sd_event_run(pcmCopyHandle->sdLoop, watchdog); if (res == 0) { AFB_ApiInfo(pcmCopyHandle->api, "LoopInThread:%s/%d Idle count=%d", pcmCopyHandle->info, pcmCopyHandle->tid, count++); continue; } if (res < 0) { AFB_ApiError(pcmCopyHandle->api, "LoopInThread:%s/%d ERROR=%i Exit errno=%s.\n", pcmCopyHandle->info, pcmCopyHandle->tid, res, strerror(res)); break; } } pthread_exit(0); } PUBLIC int AlsaPcmCopy(SoftMixerT *mixer, AlsaStreamAudioT *stream, AlsaPcmCtlT *pcmIn, AlsaPcmCtlT *pcmOut, AlsaPcmHwInfoT * opts) { char string[32]; struct pollfd *pcmInFds; int error; // Fulup need to check https://www.alsa-project.org/alsa-doc/alsa-lib/group___p_c_m___direct.html AlsaDumpPcmInfo(mixer,"PcmIn",pcmIn->handle); AlsaDumpPcmInfo(mixer,"PcmOut",pcmOut->handle); // prepare PCM for capture and replay error = AlsaPcmConf(mixer, pcmIn, opts); if (error) goto OnErrorExit; // input and output should match error = AlsaPcmConf(mixer, pcmOut, opts); if (error) goto OnErrorExit; // Prepare PCM for usage if ((error = snd_pcm_prepare(pcmOut->handle)) < 0) { AFB_ApiError(mixer->api, "AlsaPcmCopy: Fail to prepare PCM=%s error=%s", ALSA_PCM_UID(pcmOut->handle, string), snd_strerror(error)); goto OnErrorExit; }; // Prepare PCM for usage if ((error = snd_pcm_start(pcmIn->handle)) < 0) { AFB_ApiError(mixer->api, "AlsaPcmCopy: Fail to prepare PCM=%s error=%s", ALSA_PCM_UID(pcmIn->handle, string), snd_strerror(error)); goto OnErrorExit; }; AlsaPcmCopyHandleT *cHandle= calloc(1, sizeof(AlsaPcmCopyHandleT)); cHandle = cHandle; cHandle->info = "pcmCpy"; cHandle->pcmIn = pcmIn->handle; cHandle->pcmOut = pcmOut->handle; cHandle->api = mixer->api; cHandle->channels = opts->channels; cHandle->frameSize = opts->channels * opts->sampleSize; cHandle->frameCount = ALSA_BUFFER_FRAMES_COUNT; cHandle->buffer = malloc(cHandle->frameCount * cHandle->frameSize); // get FD poll descriptor for capture PCM int pcmInCount = snd_pcm_poll_descriptors_count(cHandle->pcmIn); if (pcmInCount <= 0) { AFB_ApiError(mixer->api, "AlsaPcmCopy: Fail pcmIn=%s get fds count error=%s", ALSA_PCM_UID(pcmIn->handle, string), snd_strerror(error)); goto OnErrorExit; }; pcmInFds = alloca(sizeof (*pcmInFds) * pcmInCount); if ((error = snd_pcm_poll_descriptors(pcmIn->handle, pcmInFds, pcmInCount)) < 0) { AFB_ApiError(mixer->api, "AlsaPcmCopy: Fail pcmIn=%s get pollfds error=%s", ALSA_PCM_UID(pcmOut->handle, string), snd_strerror(error)); goto OnErrorExit; }; // add poll descriptor to AGL systemd mainloop if ((error = sd_event_new(&cHandle->sdLoop)) < 0) { fprintf(stderr, "LaunchCallRequest: fail pcmin=%s creating a new loop: %s\n", ALSA_PCM_UID(pcmOut->handle, string), strerror(error)); goto OnErrorExit; } for (int idx = 0; idx < pcmInCount; idx++) { if ((error = sd_event_add_io(cHandle->sdLoop, &cHandle->evtsrc, pcmInFds[idx].fd, EPOLLIN, AlsaPcmReadCB, cHandle)) < 0) { AFB_ApiError(mixer->api, "AlsaPcmCopy: Fail pcmIn=%s sd_event_add_io err=%d", ALSA_PCM_UID(pcmIn->handle, string), error); goto OnErrorExit; } } // start a thread with a mainloop to monitor Audio-Agent if ((error = pthread_create(&cHandle->thread, NULL, &LoopInThread, cHandle)) < 0) { AFB_ApiError(mixer->api, "AlsaPcmCopy: Fail create waiting thread pcmIn=%s err=%d", ALSA_PCM_UID(pcmIn->handle, string), error); goto OnErrorExit; } // request a higher priority for each audio stream thread struct sched_param params; params.sched_priority = sched_get_priority_max(SCHED_FIFO); error= pthread_setschedparam(cHandle->thread, SCHED_FIFO, &params); if (error) { AFB_ApiWarning(mixer->api, "AlsaPcmCopy: Fail create increase stream thread priority pcmIn=%s err=%s", ALSA_PCM_UID(pcmIn->handle, string), strerror(error)); } return 0; OnErrorExit: AFB_ApiError(mixer->api, "AlsaPcmCopy: - pcmIn=%s" , ALSA_PCM_UID(pcmIn->handle, string)); AFB_ApiError(mixer->api, "AlsaPcmCopy: - pcmOut=%s", ALSA_PCM_UID(pcmOut->handle, string)); return -1; }