summaryrefslogtreecommitdiffstats
path: root/nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c')
-rw-r--r--nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c596
1 files changed, 596 insertions, 0 deletions
diff --git a/nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c b/nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c
new file mode 100644
index 00000000..19900617
--- /dev/null
+++ b/nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c
@@ -0,0 +1,596 @@
+/*
+ * @copyright Copyright (c) 2016-2020 TOYOTA MOTOR CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+///////////////////////////////////////////////////////////////////////////////
+/// \ingroup tag_NSMessageQueue
+/// \brief API Header for Zone Player Service APIs to be used by senders and
+/// receivers.
+///
+/// APIs to register/unregister notifications and add/remove recievers to the notifications.
+///
+///////////////////////////////////////////////////////////////////////////////
+
+#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
+#include "ns_msg_queue.h"
+#include <malloc.h>
+#include <mqueue.h>
+#include <ns_mq_internal.h>
+#include <errno.h>
+#include "ns_mq_anomaly.h"
+
+#include <other_service/strlcpy.h>
+#include <other_service/strlcat.h>
+
+const UI_32 DEFAULT_MSG_PRIORITY = 10;
+const UI_32 MAX_MESSAGES_STORED_IN_QUEUE = 256;
+const UI_32 MAX_SYNC_RESPONSE_STORED_IN_QUEUE = 2;
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// Function : TranslateError
+// Translates global error variables into FW EFrameworkunifiedStatus
+////////////////////////////////////////////////////////////////////////////////////////////////////
+EFrameworkunifiedStatus TranslateError(int error) {
+ EFrameworkunifiedStatus eStatus = eFrameworkunifiedStatusFail;
+
+ switch (error) {
+ case EOK:
+ eStatus = eFrameworkunifiedStatusOK;
+ break;
+ case EBUSY:
+ eStatus = eFrameworkunifiedStatusThreadBusy;
+ break;
+ case EDEADLK:
+ eStatus = eFrameworkunifiedStatusThreadSelfJoin;
+ break;
+ case EFAULT:
+ eStatus = eFrameworkunifiedStatusFault;
+ break;
+ case EINVAL:
+ eStatus = eFrameworkunifiedStatusInvldParam;
+ break;
+ case ESRCH:
+ eStatus = eFrameworkunifiedStatusThreadNotExist;
+ break;
+ case EBADF:
+ eStatus = eFrameworkunifiedStatusErrNoEBADF;
+ break;
+ case EAGAIN:
+ eStatus = eFrameworkunifiedStatusErrNoEAGAIN;
+ break;
+ case EINTR:
+ eStatus = eFrameworkunifiedStatusErrNoEINTR;
+ break;
+ case EMSGSIZE:
+ eStatus = eFrameworkunifiedStatusInvldBufSize;
+ break;
+ case ENOTSUP:
+ eStatus = eFrameworkunifiedStatusNotImplemented;
+ break;
+ case EPERM:
+ eStatus = eFrameworkunifiedStatusAccessError;
+ break;
+ default:
+ eStatus = eFrameworkunifiedStatusFail;
+ break;
+ }
+
+ return eStatus;
+}
+
+static UI_8 GetNormalizedMqName(PSTR normalized_mqname , PCSTR name, size_t size) {
+ if ((NULL != name) && (NULL != normalized_mqname)) {
+ if (name[0] != '/') {
+ strlcpy(normalized_mqname, "/", size);
+ strlcat(normalized_mqname, name, size);
+ } else {
+ strlcpy(normalized_mqname, name, size);
+ }
+
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+HANDLE OpenReceiverNotBlocked(PCSTR name) {
+ HANDLE rtnHandle = INVALID_HANDLE; // invalid until made valid.
+ PSTR normalized_qname = NULL;
+ SI_32 q_fd = -1; // fd to mqueue
+ SQhandle *rcvHndl = NULL;
+ struct mq_attr mqattr;
+ size_t norm_qname_size = 0;
+
+ // Check for invalid name
+ if (name == NULL) {
+ return INVALID_HANDLE;
+ }
+
+ if (strlen(name) >= MAX_QUEUE_NAME_SIZE) {
+ return INVALID_HANDLE;
+ }
+
+ norm_qname_size = strlen(name) + 2;
+ normalized_qname = (PSTR)malloc(norm_qname_size);
+
+ if (0 != GetNormalizedMqName(normalized_qname, name, norm_qname_size)) {
+ if (strlen(normalized_qname) > LIMIT_QUEUE_NAME_SIZE) {
+ free(normalized_qname);
+ return INVALID_HANDLE;
+ }
+
+ mqattr.mq_flags = 0;
+ mqattr.mq_maxmsg = (__syscall_slong_t)MAX_MESSAGES_STORED_IN_QUEUE;
+ mqattr.mq_msgsize = MAX_QUEUE_MSG_SIZE;
+
+ q_fd = mq_open(normalized_qname, O_RDONLY | O_CREAT | O_NONBLOCK | O_CLOEXEC , 0666, &mqattr);
+
+ if (q_fd != -1) {
+ rcvHndl = (SQhandle *)malloc(sizeof(SQhandle));
+ if (rcvHndl != NULL) {
+ rcvHndl->check_code = MQ_CHECK_CODE;
+ rcvHndl->fd = q_fd;
+ rcvHndl->q_name = normalized_qname;
+ rcvHndl->q_type = eQTypeReveiver;
+
+ /////////////////////////////////////////
+ rcvHndl->threadid = 0;
+ /////////////////////////////////////////
+
+ // Set the return handle to rcvHndl
+ rtnHandle = rcvHndl;
+ }
+ }
+ }
+
+ if (INVALID_HANDLE == rtnHandle) {
+ // couldn't connect the queue,
+ // release the memory to normalized queue name
+ if (normalized_qname != NULL) {
+ free(normalized_qname);
+ normalized_qname = NULL; // mb20110108 item 11
+ }
+ }
+
+ return rtnHandle;
+}
+
+static HANDLE openReceiverInternal(PCSTR name, UI_32 mq_maxmsg, UI_32 mq_msgsize, BOOL is_internal) {
+ HANDLE rtnHandle = INVALID_HANDLE; // invalid until made valid.
+ PSTR normalized_qname = NULL;
+ SI_32 q_fd = -1; // fd to mqueue
+ SQhandle *rcvHndl = NULL;
+ struct mq_attr mqattr;
+ size_t norm_qname_size = 0;
+
+ // Check for invalid name
+ if (name == NULL) {
+ return INVALID_HANDLE;
+ }
+
+ if (strlen(name) >= MAX_QUEUE_NAME_SIZE) {
+ return INVALID_HANDLE;
+ }
+
+ norm_qname_size = strlen(name) + 2;
+ normalized_qname = (PSTR)malloc(norm_qname_size);
+
+ if (0 != GetNormalizedMqName(normalized_qname, name, norm_qname_size)) {
+ int i;
+
+ if (!is_internal && strlen(normalized_qname) > LIMIT_QUEUE_NAME_SIZE) {
+ free(normalized_qname);
+ return INVALID_HANDLE;
+ }
+
+ mqattr.mq_flags = 0;
+ mqattr.mq_maxmsg = (__syscall_slong_t)mq_maxmsg;
+ mqattr.mq_msgsize = (__syscall_slong_t)mq_msgsize;
+
+ for (i = 0; i < (sizeof(mq_anomaly_list) / sizeof(mq_anomaly_list[0])); i++) {
+ if (strcmp(normalized_qname, mq_anomaly_list[i].name) == 0) {
+ mqattr.mq_maxmsg = (__syscall_slong_t)mq_anomaly_list[i].maxMsg;
+ break;
+ }
+ }
+
+
+ q_fd = mq_open(normalized_qname, O_RDONLY | O_CREAT | O_CLOEXEC, 0666, &mqattr);
+
+ if (q_fd != -1) {
+ rcvHndl = (SQhandle *)malloc(sizeof(SQhandle));
+ if (rcvHndl != NULL) {
+ rcvHndl->check_code = MQ_CHECK_CODE;
+ rcvHndl->fd = q_fd;
+ rcvHndl->q_name = normalized_qname;
+ rcvHndl->q_type = eQTypeReveiver;
+
+ /////////////////////////////////////////
+ rcvHndl->threadid = 0;
+ /////////////////////////////////////////
+
+ // Set the return handle to rcvHndl
+ rtnHandle = rcvHndl;
+ }
+
+ }
+ }
+
+ if (INVALID_HANDLE == rtnHandle) {
+ // couldn't connect the queue,
+ // release the memory to normalized queue name
+ if (normalized_qname != NULL) {
+ free(normalized_qname);
+ normalized_qname = NULL; // mb20110108 item 11
+ }
+ }
+
+ return rtnHandle;
+}
+
+HANDLE OpenReceiver(PCSTR name) {
+ return openReceiverInternal(name, MAX_MESSAGES_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, FALSE);
+}
+
+HANDLE openSyncReceiver(PCSTR name) {
+ return openReceiverInternal(name, MAX_SYNC_RESPONSE_STORED_IN_QUEUE , MAX_QUEUE_MSG_SIZE, TRUE);
+}
+
+static HANDLE openSenderInternal(PCSTR name, UI_32 mq_maxmsg, UI_32 mq_msgsize, BOOL is_internal, BOOL zero_copy) {
+ HANDLE rtnHandle = INVALID_HANDLE; // invalid until made valid.
+ SI_32 q_fd = -1; // fd to queue
+ SQhandle *sndHndl = NULL;
+ PSTR normalized_qname = NULL;
+ size_t norm_qname_size = 0;
+
+ struct mq_attr mqattr;
+
+ // Check for invalid name
+ if (name == NULL) {
+ return rtnHandle;
+ }
+
+ if (strlen(name) >= MAX_QUEUE_NAME_SIZE) {
+ return rtnHandle;
+ }
+
+ norm_qname_size = strlen(name) + 2;
+ normalized_qname = (PSTR)malloc(norm_qname_size);
+
+ if (NULL == normalized_qname) {
+ return rtnHandle;
+ }
+
+ if (0 != GetNormalizedMqName(normalized_qname, name, norm_qname_size)) {
+ int i;
+
+
+ if (!is_internal && strlen(normalized_qname) > LIMIT_QUEUE_NAME_SIZE) {
+ free(normalized_qname);
+ return INVALID_HANDLE;
+ }
+
+ mqattr.mq_flags = 0;
+ mqattr.mq_maxmsg = (__syscall_slong_t)mq_maxmsg;
+ mqattr.mq_msgsize = (__syscall_slong_t)mq_msgsize;
+
+ for (i = 0; i < (sizeof(mq_anomaly_list) / sizeof(mq_anomaly_list[0])); i++) {
+ if (strcmp(normalized_qname, mq_anomaly_list[i].name) == 0) {
+ mqattr.mq_maxmsg = (__syscall_slong_t)mq_anomaly_list[i].maxMsg;
+ break;
+ }
+ }
+
+ q_fd = mq_open(normalized_qname, O_WRONLY | O_CREAT | O_NONBLOCK | O_CLOEXEC, 0666, &mqattr);
+
+ if (q_fd != -1) {
+ sndHndl = (SQhandle *)malloc(sizeof(SQhandle));
+ if (NULL != sndHndl) {
+ sndHndl->check_code = MQ_CHECK_CODE;
+ sndHndl->fd = q_fd;
+ sndHndl->q_name = normalized_qname;
+ sndHndl->q_type = eQTypeSender;
+
+ sndHndl->threadid = 0;
+
+
+ // Set the return handle to sndHndl
+ rtnHandle = sndHndl;
+
+ if (zero_copy) {
+ sndHndl->sendbuf = malloc(MAX_QUEUE_MSG_SIZE);
+ if (sndHndl->sendbuf == NULL) {
+ // LCOV_EXCL_START 5: malloc's error case
+ AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
+ free(sndHndl);
+ rtnHandle = INVALID_HANDLE;
+ // LCOV_EXCL_STOP
+ }
+ } else {
+ sndHndl->sendbuf = NULL;
+ }
+ }
+ }
+ }
+
+ if (INVALID_HANDLE == rtnHandle) {
+ // couldn't connect the queue,
+ // release the memory to normalized queue name
+ if (normalized_qname != NULL) {
+ free(normalized_qname);
+ normalized_qname = NULL; // mb20110108 item 11
+ }
+ if (q_fd != -1) {
+ mq_close(q_fd);
+ }
+ }
+
+ return rtnHandle; // invalid until made valid.
+}
+
+HANDLE OpenSender(PCSTR name) {
+ return openSenderInternal(name, MAX_MESSAGES_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, FALSE, FALSE);
+}
+
+
+HANDLE OpenSenderChild(PCSTR name, pthread_t threadid) {
+ HANDLE h = OpenSender(name);
+ if (INVALID_HANDLE != h) {
+ SQhandle *sq = (SQhandle *)h;
+ sq->threadid = threadid;
+ }
+ return h;
+}
+
+
+HANDLE openSyncSender(PCSTR name) {
+ return openSenderInternal(name, MAX_SYNC_RESPONSE_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, TRUE, FALSE);
+}
+
+HANDLE openSenderZc(PCSTR name) {
+ return openSenderInternal(name, MAX_MESSAGES_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, FALSE, TRUE);
+}
+
+
+EFrameworkunifiedStatus JoinChild(HANDLE hChildApp) {
+ // mb20110108 Code re-structured per comments 24 & 25
+ if (mqCheckValidHandle(hChildApp)) {
+ SQhandle *sq = (SQhandle *)hChildApp;
+ return TranslateError(pthread_join(sq->threadid, NULL));
+ } else {
+ return eFrameworkunifiedStatusFail;
+ }
+}
+
+EFrameworkunifiedStatus GetChildThreadPriority(HANDLE hChildApp, PSI_32 threadPrio) {
+ EFrameworkunifiedStatus eStatus = eFrameworkunifiedStatusFail;
+ // LCOV_EXCL_BR_START 6:GetChildThreadPriority is called by McGetChildThreadPriority, and hChildApp will check there.
+ if (mqCheckValidHandle(hChildApp)) {
+ // LCOV_EXCL_BR_STOP
+ SQhandle *sq = (SQhandle *)hChildApp;
+ SI_32 schedPolicy; // this is needed according to syntax of pthread_getschedparam. api not available to get prio directly.
+ struct sched_param schedParam;
+ eStatus = TranslateError(pthread_getschedparam(sq->threadid, &schedPolicy, &schedParam));
+
+ *threadPrio = schedParam.sched_priority;
+ } else {
+ // LCOV_EXCL_START 6:GetChildThreadPriority is called by McGetChildThreadPriority, and hChildApp will check there.
+ AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
+ eStatus = eFrameworkunifiedStatusInvldHandle;
+ // LCOV_EXCL_STOP
+ }
+ return eStatus;
+}
+
+/// endhack...
+/////////////////////////////////////////
+
+EFrameworkunifiedStatus SendMessage(HANDLE hMessage, UI_32 length, PVOID data) {
+ return SendMessageWithPriority(hMessage, length, data, eFrameworkunifiedMsgPrioNormal);
+}
+
+EFrameworkunifiedStatus SendMessageWithPriority(HANDLE hMessage, UI_32 length, PVOID data, EFrameworkunifiedMessagePriorties priority) {
+ // define a Q handle structure
+ SQhandle *sndHndl = NULL;
+
+ // mb20110108 Added per comments 27 & 28
+ if (length > 0 && NULL == data) {
+ return eFrameworkunifiedStatusInvldBuf;
+ }
+
+ // check handle for null case...
+ if (mqCheckValidHandle(hMessage) == FALSE) {
+ return eFrameworkunifiedStatusInvldHandle;
+ }
+
+ sndHndl = (SQhandle *)hMessage;
+
+ // check to see if this is a sender handle
+ if (sndHndl->q_type != eQTypeSender) {
+ return eFrameworkunifiedStatusInvldHndlType;
+ }
+
+ if (-1 == mq_send((mqd_t)sndHndl->fd, (PCSTR)data, (size_t)length, (unsigned int)priority)) {
+ if (errno == EAGAIN) {
+ return eFrameworkunifiedStatusMsgQFull;
+ } else {
+ return TranslateError(errno);
+ }
+ }
+ return eFrameworkunifiedStatusOK;
+}
+
+SI_32 ReceiveMessage(HANDLE hMessage, UI_32 length, PVOID data) {
+ // define a Q handle structure
+ SQhandle *rcvHndl = NULL;
+
+ // check handle for null case...
+ if (mqCheckValidHandle(hMessage) == FALSE) {
+ errno = ENODATA;
+ return -1;
+ }
+
+ rcvHndl = (SQhandle *)hMessage;
+
+ // check to see if this is a receiver handle
+ if (rcvHndl->q_type != eQTypeReveiver) {
+ errno = ENODATA;
+ return -1;
+ }
+
+
+ return (SI_32)mq_receive((mqd_t)rcvHndl->fd, (char *)data, (size_t)length, NULL);
+}
+
+EFrameworkunifiedStatus CloseReceiver(HANDLE handle) {
+ SQhandle *rcvHndl = NULL;
+ SI_32 q_fd;
+
+ // check handle for null case...
+ if (mqCheckValidHandle(handle) == FALSE) {
+ return eFrameworkunifiedStatusInvldHandle;
+ }
+
+ rcvHndl = (SQhandle *)handle;
+
+ // check to see if this is a receiver handle
+ if (rcvHndl->q_type != eQTypeReveiver) {
+ return eFrameworkunifiedStatusInvldHndlType;
+ }
+
+ rcvHndl->check_code = 0;
+ q_fd = rcvHndl->fd;
+
+ if (NULL != rcvHndl->q_name) {
+ free(rcvHndl->q_name); // remove the memory to the name
+ rcvHndl->q_name = NULL;
+ }
+ free((void *)handle); // remove handle.. now..
+ handle = INVALID_HANDLE;
+
+ return ((-1 == mq_close((mqd_t)q_fd)) ? eFrameworkunifiedStatusInvldHandle : eFrameworkunifiedStatusOK);
+}
+
+EFrameworkunifiedStatus CloseSender(HANDLE handle) {
+ SQhandle *sndHndl = NULL;
+ SI_32 q_fd;
+
+ // check handle for null case...
+ if (mqCheckValidHandle(handle) == FALSE) {
+ return eFrameworkunifiedStatusInvldHandle;
+ }
+
+ sndHndl = (SQhandle *)handle;
+
+ // check to see if this is a sender handle
+ if (sndHndl->q_type != eQTypeSender) {
+ return eFrameworkunifiedStatusInvldHndlType;
+ }
+
+ sndHndl->check_code = 0;
+ q_fd = sndHndl->fd; // copy the fd, need it to close the queues....
+
+ if (NULL != sndHndl->q_name) {
+ free(sndHndl->q_name); // remove the memory to the name
+ sndHndl->q_name = NULL;
+ }
+ if (NULL != sndHndl->sendbuf) {
+ free(sndHndl->sendbuf);
+ sndHndl->sendbuf = NULL;
+ }
+ free((void *)handle); // remove handle.. now..
+ handle = INVALID_HANDLE; // invalidate handle so user doesn't reuse...
+
+ return (-1 == mq_close((mqd_t)q_fd)) ? eFrameworkunifiedStatusInvldHandle : eFrameworkunifiedStatusOK;
+}
+
+static UI_8 IsMessageAvailable(SI_32 fd) {
+ struct mq_attr sMqStatus;
+ if (-1 == mq_getattr(fd, &sMqStatus)) {
+ // Error Detected.
+ return 0;
+ } else {
+ if (0 < sMqStatus.mq_curmsgs) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+void Flush(HANDLE hMessage) {
+ SQhandle *rcvHndl = NULL;
+
+ if (mqCheckValidHandle(hMessage)) {
+ rcvHndl = (SQhandle *)hMessage;
+
+ if (rcvHndl->q_type != eQTypeReveiver) {
+ return;
+ }
+
+ // check there is anything on the queue before going into the loop....
+ if (IsMessageAvailable(rcvHndl->fd)) {
+ CHAR l_pReceiveBuffer_o[MAX_QUEUE_MSG_SIZE];
+
+ // read till there isn't anything on the queue.
+ while (IsMessageAvailable(rcvHndl->fd)) {
+ mq_receive((mqd_t)rcvHndl->fd, l_pReceiveBuffer_o, (size_t)MAX_QUEUE_MSG_SIZE, NULL);
+ }
+ }
+ }
+}
+
+EQType GetQueueType(HANDLE hMessage) {
+ EQType qType = eQTypeInvld;
+
+ if (mqCheckValidHandle(hMessage)) {
+ SQhandle *handle = (SQhandle *)hMessage;
+ qType = handle->q_type;
+ }
+
+ return qType;
+}
+
+PCSTR GetQueueName(HANDLE hMessage) {
+ PCSTR name = NULL;
+
+ if (mqCheckValidHandle(hMessage)) {
+ SQhandle *hMsgQ = (SQhandle *)hMessage;
+
+ if (hMsgQ->q_type == eQTypeSender ||
+ hMsgQ->q_type == eQTypeReveiver) {
+ name = hMsgQ->q_name;
+ }
+ }
+
+ return name;
+}
+
+int GetQueueFD(HANDLE hMessage) {
+ int fd = -1;
+
+ if (mqCheckValidHandle(hMessage)) {
+ SQhandle *hMsgQ = (SQhandle *)hMessage;
+
+ if (hMsgQ->q_type == eQTypeSender ||
+ hMsgQ->q_type == eQTypeReveiver) {
+ fd = hMsgQ->fd;
+ }
+ }
+
+ return fd;
+}