From 947c78887e791596d4a5ec2d1079f8b1a049628b Mon Sep 17 00:00:00 2001 From: takeshi_hoshina Date: Tue, 27 Oct 2020 11:16:21 +0900 Subject: basesystem 0.1 --- .../client/NS_MessageQueue/src/ns_msg_queue.c | 596 +++++++++++++++++++++ 1 file changed, 596 insertions(+) create mode 100644 nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c (limited to 'nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c') diff --git a/nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c b/nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c new file mode 100644 index 00000000..19900617 --- /dev/null +++ b/nsframework/framework_unified/client/NS_MessageQueue/src/ns_msg_queue.c @@ -0,0 +1,596 @@ +/* + * @copyright Copyright (c) 2016-2020 TOYOTA MOTOR CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/////////////////////////////////////////////////////////////////////////////// +/// \ingroup tag_NSMessageQueue +/// \brief API Header for Zone Player Service APIs to be used by senders and +/// receivers. +/// +/// APIs to register/unregister notifications and add/remove recievers to the notifications. +/// +/////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include "ns_msg_queue.h" +#include +#include +#include +#include +#include "ns_mq_anomaly.h" + +#include +#include + +const UI_32 DEFAULT_MSG_PRIORITY = 10; +const UI_32 MAX_MESSAGES_STORED_IN_QUEUE = 256; +const UI_32 MAX_SYNC_RESPONSE_STORED_IN_QUEUE = 2; + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Function : TranslateError +// Translates global error variables into FW EFrameworkunifiedStatus +//////////////////////////////////////////////////////////////////////////////////////////////////// +EFrameworkunifiedStatus TranslateError(int error) { + EFrameworkunifiedStatus eStatus = eFrameworkunifiedStatusFail; + + switch (error) { + case EOK: + eStatus = eFrameworkunifiedStatusOK; + break; + case EBUSY: + eStatus = eFrameworkunifiedStatusThreadBusy; + break; + case EDEADLK: + eStatus = eFrameworkunifiedStatusThreadSelfJoin; + break; + case EFAULT: + eStatus = eFrameworkunifiedStatusFault; + break; + case EINVAL: + eStatus = eFrameworkunifiedStatusInvldParam; + break; + case ESRCH: + eStatus = eFrameworkunifiedStatusThreadNotExist; + break; + case EBADF: + eStatus = eFrameworkunifiedStatusErrNoEBADF; + break; + case EAGAIN: + eStatus = eFrameworkunifiedStatusErrNoEAGAIN; + break; + case EINTR: + eStatus = eFrameworkunifiedStatusErrNoEINTR; + break; + case EMSGSIZE: + eStatus = eFrameworkunifiedStatusInvldBufSize; + break; + case ENOTSUP: + eStatus = eFrameworkunifiedStatusNotImplemented; + break; + case EPERM: + eStatus = eFrameworkunifiedStatusAccessError; + break; + default: + eStatus = eFrameworkunifiedStatusFail; + break; + } + + return eStatus; +} + +static UI_8 GetNormalizedMqName(PSTR normalized_mqname , PCSTR name, size_t size) { + if ((NULL != name) && (NULL != normalized_mqname)) { + if (name[0] != '/') { + strlcpy(normalized_mqname, "/", size); + strlcat(normalized_mqname, name, size); + } else { + strlcpy(normalized_mqname, name, size); + } + + return 1; + } else { + return 0; + } +} + +HANDLE OpenReceiverNotBlocked(PCSTR name) { + HANDLE rtnHandle = INVALID_HANDLE; // invalid until made valid. + PSTR normalized_qname = NULL; + SI_32 q_fd = -1; // fd to mqueue + SQhandle *rcvHndl = NULL; + struct mq_attr mqattr; + size_t norm_qname_size = 0; + + // Check for invalid name + if (name == NULL) { + return INVALID_HANDLE; + } + + if (strlen(name) >= MAX_QUEUE_NAME_SIZE) { + return INVALID_HANDLE; + } + + norm_qname_size = strlen(name) + 2; + normalized_qname = (PSTR)malloc(norm_qname_size); + + if (0 != GetNormalizedMqName(normalized_qname, name, norm_qname_size)) { + if (strlen(normalized_qname) > LIMIT_QUEUE_NAME_SIZE) { + free(normalized_qname); + return INVALID_HANDLE; + } + + mqattr.mq_flags = 0; + mqattr.mq_maxmsg = (__syscall_slong_t)MAX_MESSAGES_STORED_IN_QUEUE; + mqattr.mq_msgsize = MAX_QUEUE_MSG_SIZE; + + q_fd = mq_open(normalized_qname, O_RDONLY | O_CREAT | O_NONBLOCK | O_CLOEXEC , 0666, &mqattr); + + if (q_fd != -1) { + rcvHndl = (SQhandle *)malloc(sizeof(SQhandle)); + if (rcvHndl != NULL) { + rcvHndl->check_code = MQ_CHECK_CODE; + rcvHndl->fd = q_fd; + rcvHndl->q_name = normalized_qname; + rcvHndl->q_type = eQTypeReveiver; + + ///////////////////////////////////////// + rcvHndl->threadid = 0; + ///////////////////////////////////////// + + // Set the return handle to rcvHndl + rtnHandle = rcvHndl; + } + } + } + + if (INVALID_HANDLE == rtnHandle) { + // couldn't connect the queue, + // release the memory to normalized queue name + if (normalized_qname != NULL) { + free(normalized_qname); + normalized_qname = NULL; // mb20110108 item 11 + } + } + + return rtnHandle; +} + +static HANDLE openReceiverInternal(PCSTR name, UI_32 mq_maxmsg, UI_32 mq_msgsize, BOOL is_internal) { + HANDLE rtnHandle = INVALID_HANDLE; // invalid until made valid. + PSTR normalized_qname = NULL; + SI_32 q_fd = -1; // fd to mqueue + SQhandle *rcvHndl = NULL; + struct mq_attr mqattr; + size_t norm_qname_size = 0; + + // Check for invalid name + if (name == NULL) { + return INVALID_HANDLE; + } + + if (strlen(name) >= MAX_QUEUE_NAME_SIZE) { + return INVALID_HANDLE; + } + + norm_qname_size = strlen(name) + 2; + normalized_qname = (PSTR)malloc(norm_qname_size); + + if (0 != GetNormalizedMqName(normalized_qname, name, norm_qname_size)) { + int i; + + if (!is_internal && strlen(normalized_qname) > LIMIT_QUEUE_NAME_SIZE) { + free(normalized_qname); + return INVALID_HANDLE; + } + + mqattr.mq_flags = 0; + mqattr.mq_maxmsg = (__syscall_slong_t)mq_maxmsg; + mqattr.mq_msgsize = (__syscall_slong_t)mq_msgsize; + + for (i = 0; i < (sizeof(mq_anomaly_list) / sizeof(mq_anomaly_list[0])); i++) { + if (strcmp(normalized_qname, mq_anomaly_list[i].name) == 0) { + mqattr.mq_maxmsg = (__syscall_slong_t)mq_anomaly_list[i].maxMsg; + break; + } + } + + + q_fd = mq_open(normalized_qname, O_RDONLY | O_CREAT | O_CLOEXEC, 0666, &mqattr); + + if (q_fd != -1) { + rcvHndl = (SQhandle *)malloc(sizeof(SQhandle)); + if (rcvHndl != NULL) { + rcvHndl->check_code = MQ_CHECK_CODE; + rcvHndl->fd = q_fd; + rcvHndl->q_name = normalized_qname; + rcvHndl->q_type = eQTypeReveiver; + + ///////////////////////////////////////// + rcvHndl->threadid = 0; + ///////////////////////////////////////// + + // Set the return handle to rcvHndl + rtnHandle = rcvHndl; + } + + } + } + + if (INVALID_HANDLE == rtnHandle) { + // couldn't connect the queue, + // release the memory to normalized queue name + if (normalized_qname != NULL) { + free(normalized_qname); + normalized_qname = NULL; // mb20110108 item 11 + } + } + + return rtnHandle; +} + +HANDLE OpenReceiver(PCSTR name) { + return openReceiverInternal(name, MAX_MESSAGES_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, FALSE); +} + +HANDLE openSyncReceiver(PCSTR name) { + return openReceiverInternal(name, MAX_SYNC_RESPONSE_STORED_IN_QUEUE , MAX_QUEUE_MSG_SIZE, TRUE); +} + +static HANDLE openSenderInternal(PCSTR name, UI_32 mq_maxmsg, UI_32 mq_msgsize, BOOL is_internal, BOOL zero_copy) { + HANDLE rtnHandle = INVALID_HANDLE; // invalid until made valid. + SI_32 q_fd = -1; // fd to queue + SQhandle *sndHndl = NULL; + PSTR normalized_qname = NULL; + size_t norm_qname_size = 0; + + struct mq_attr mqattr; + + // Check for invalid name + if (name == NULL) { + return rtnHandle; + } + + if (strlen(name) >= MAX_QUEUE_NAME_SIZE) { + return rtnHandle; + } + + norm_qname_size = strlen(name) + 2; + normalized_qname = (PSTR)malloc(norm_qname_size); + + if (NULL == normalized_qname) { + return rtnHandle; + } + + if (0 != GetNormalizedMqName(normalized_qname, name, norm_qname_size)) { + int i; + + + if (!is_internal && strlen(normalized_qname) > LIMIT_QUEUE_NAME_SIZE) { + free(normalized_qname); + return INVALID_HANDLE; + } + + mqattr.mq_flags = 0; + mqattr.mq_maxmsg = (__syscall_slong_t)mq_maxmsg; + mqattr.mq_msgsize = (__syscall_slong_t)mq_msgsize; + + for (i = 0; i < (sizeof(mq_anomaly_list) / sizeof(mq_anomaly_list[0])); i++) { + if (strcmp(normalized_qname, mq_anomaly_list[i].name) == 0) { + mqattr.mq_maxmsg = (__syscall_slong_t)mq_anomaly_list[i].maxMsg; + break; + } + } + + q_fd = mq_open(normalized_qname, O_WRONLY | O_CREAT | O_NONBLOCK | O_CLOEXEC, 0666, &mqattr); + + if (q_fd != -1) { + sndHndl = (SQhandle *)malloc(sizeof(SQhandle)); + if (NULL != sndHndl) { + sndHndl->check_code = MQ_CHECK_CODE; + sndHndl->fd = q_fd; + sndHndl->q_name = normalized_qname; + sndHndl->q_type = eQTypeSender; + + sndHndl->threadid = 0; + + + // Set the return handle to sndHndl + rtnHandle = sndHndl; + + if (zero_copy) { + sndHndl->sendbuf = malloc(MAX_QUEUE_MSG_SIZE); + if (sndHndl->sendbuf == NULL) { + // LCOV_EXCL_START 5: malloc's error case + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + free(sndHndl); + rtnHandle = INVALID_HANDLE; + // LCOV_EXCL_STOP + } + } else { + sndHndl->sendbuf = NULL; + } + } + } + } + + if (INVALID_HANDLE == rtnHandle) { + // couldn't connect the queue, + // release the memory to normalized queue name + if (normalized_qname != NULL) { + free(normalized_qname); + normalized_qname = NULL; // mb20110108 item 11 + } + if (q_fd != -1) { + mq_close(q_fd); + } + } + + return rtnHandle; // invalid until made valid. +} + +HANDLE OpenSender(PCSTR name) { + return openSenderInternal(name, MAX_MESSAGES_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, FALSE, FALSE); +} + + +HANDLE OpenSenderChild(PCSTR name, pthread_t threadid) { + HANDLE h = OpenSender(name); + if (INVALID_HANDLE != h) { + SQhandle *sq = (SQhandle *)h; + sq->threadid = threadid; + } + return h; +} + + +HANDLE openSyncSender(PCSTR name) { + return openSenderInternal(name, MAX_SYNC_RESPONSE_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, TRUE, FALSE); +} + +HANDLE openSenderZc(PCSTR name) { + return openSenderInternal(name, MAX_MESSAGES_STORED_IN_QUEUE, MAX_QUEUE_MSG_SIZE, FALSE, TRUE); +} + + +EFrameworkunifiedStatus JoinChild(HANDLE hChildApp) { + // mb20110108 Code re-structured per comments 24 & 25 + if (mqCheckValidHandle(hChildApp)) { + SQhandle *sq = (SQhandle *)hChildApp; + return TranslateError(pthread_join(sq->threadid, NULL)); + } else { + return eFrameworkunifiedStatusFail; + } +} + +EFrameworkunifiedStatus GetChildThreadPriority(HANDLE hChildApp, PSI_32 threadPrio) { + EFrameworkunifiedStatus eStatus = eFrameworkunifiedStatusFail; + // LCOV_EXCL_BR_START 6:GetChildThreadPriority is called by McGetChildThreadPriority, and hChildApp will check there. + if (mqCheckValidHandle(hChildApp)) { + // LCOV_EXCL_BR_STOP + SQhandle *sq = (SQhandle *)hChildApp; + SI_32 schedPolicy; // this is needed according to syntax of pthread_getschedparam. api not available to get prio directly. + struct sched_param schedParam; + eStatus = TranslateError(pthread_getschedparam(sq->threadid, &schedPolicy, &schedParam)); + + *threadPrio = schedParam.sched_priority; + } else { + // LCOV_EXCL_START 6:GetChildThreadPriority is called by McGetChildThreadPriority, and hChildApp will check there. + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + eStatus = eFrameworkunifiedStatusInvldHandle; + // LCOV_EXCL_STOP + } + return eStatus; +} + +/// endhack... +///////////////////////////////////////// + +EFrameworkunifiedStatus SendMessage(HANDLE hMessage, UI_32 length, PVOID data) { + return SendMessageWithPriority(hMessage, length, data, eFrameworkunifiedMsgPrioNormal); +} + +EFrameworkunifiedStatus SendMessageWithPriority(HANDLE hMessage, UI_32 length, PVOID data, EFrameworkunifiedMessagePriorties priority) { + // define a Q handle structure + SQhandle *sndHndl = NULL; + + // mb20110108 Added per comments 27 & 28 + if (length > 0 && NULL == data) { + return eFrameworkunifiedStatusInvldBuf; + } + + // check handle for null case... + if (mqCheckValidHandle(hMessage) == FALSE) { + return eFrameworkunifiedStatusInvldHandle; + } + + sndHndl = (SQhandle *)hMessage; + + // check to see if this is a sender handle + if (sndHndl->q_type != eQTypeSender) { + return eFrameworkunifiedStatusInvldHndlType; + } + + if (-1 == mq_send((mqd_t)sndHndl->fd, (PCSTR)data, (size_t)length, (unsigned int)priority)) { + if (errno == EAGAIN) { + return eFrameworkunifiedStatusMsgQFull; + } else { + return TranslateError(errno); + } + } + return eFrameworkunifiedStatusOK; +} + +SI_32 ReceiveMessage(HANDLE hMessage, UI_32 length, PVOID data) { + // define a Q handle structure + SQhandle *rcvHndl = NULL; + + // check handle for null case... + if (mqCheckValidHandle(hMessage) == FALSE) { + errno = ENODATA; + return -1; + } + + rcvHndl = (SQhandle *)hMessage; + + // check to see if this is a receiver handle + if (rcvHndl->q_type != eQTypeReveiver) { + errno = ENODATA; + return -1; + } + + + return (SI_32)mq_receive((mqd_t)rcvHndl->fd, (char *)data, (size_t)length, NULL); +} + +EFrameworkunifiedStatus CloseReceiver(HANDLE handle) { + SQhandle *rcvHndl = NULL; + SI_32 q_fd; + + // check handle for null case... + if (mqCheckValidHandle(handle) == FALSE) { + return eFrameworkunifiedStatusInvldHandle; + } + + rcvHndl = (SQhandle *)handle; + + // check to see if this is a receiver handle + if (rcvHndl->q_type != eQTypeReveiver) { + return eFrameworkunifiedStatusInvldHndlType; + } + + rcvHndl->check_code = 0; + q_fd = rcvHndl->fd; + + if (NULL != rcvHndl->q_name) { + free(rcvHndl->q_name); // remove the memory to the name + rcvHndl->q_name = NULL; + } + free((void *)handle); // remove handle.. now.. + handle = INVALID_HANDLE; + + return ((-1 == mq_close((mqd_t)q_fd)) ? eFrameworkunifiedStatusInvldHandle : eFrameworkunifiedStatusOK); +} + +EFrameworkunifiedStatus CloseSender(HANDLE handle) { + SQhandle *sndHndl = NULL; + SI_32 q_fd; + + // check handle for null case... + if (mqCheckValidHandle(handle) == FALSE) { + return eFrameworkunifiedStatusInvldHandle; + } + + sndHndl = (SQhandle *)handle; + + // check to see if this is a sender handle + if (sndHndl->q_type != eQTypeSender) { + return eFrameworkunifiedStatusInvldHndlType; + } + + sndHndl->check_code = 0; + q_fd = sndHndl->fd; // copy the fd, need it to close the queues.... + + if (NULL != sndHndl->q_name) { + free(sndHndl->q_name); // remove the memory to the name + sndHndl->q_name = NULL; + } + if (NULL != sndHndl->sendbuf) { + free(sndHndl->sendbuf); + sndHndl->sendbuf = NULL; + } + free((void *)handle); // remove handle.. now.. + handle = INVALID_HANDLE; // invalidate handle so user doesn't reuse... + + return (-1 == mq_close((mqd_t)q_fd)) ? eFrameworkunifiedStatusInvldHandle : eFrameworkunifiedStatusOK; +} + +static UI_8 IsMessageAvailable(SI_32 fd) { + struct mq_attr sMqStatus; + if (-1 == mq_getattr(fd, &sMqStatus)) { + // Error Detected. + return 0; + } else { + if (0 < sMqStatus.mq_curmsgs) { + return 1; + } else { + return 0; + } + } +} + +void Flush(HANDLE hMessage) { + SQhandle *rcvHndl = NULL; + + if (mqCheckValidHandle(hMessage)) { + rcvHndl = (SQhandle *)hMessage; + + if (rcvHndl->q_type != eQTypeReveiver) { + return; + } + + // check there is anything on the queue before going into the loop.... + if (IsMessageAvailable(rcvHndl->fd)) { + CHAR l_pReceiveBuffer_o[MAX_QUEUE_MSG_SIZE]; + + // read till there isn't anything on the queue. + while (IsMessageAvailable(rcvHndl->fd)) { + mq_receive((mqd_t)rcvHndl->fd, l_pReceiveBuffer_o, (size_t)MAX_QUEUE_MSG_SIZE, NULL); + } + } + } +} + +EQType GetQueueType(HANDLE hMessage) { + EQType qType = eQTypeInvld; + + if (mqCheckValidHandle(hMessage)) { + SQhandle *handle = (SQhandle *)hMessage; + qType = handle->q_type; + } + + return qType; +} + +PCSTR GetQueueName(HANDLE hMessage) { + PCSTR name = NULL; + + if (mqCheckValidHandle(hMessage)) { + SQhandle *hMsgQ = (SQhandle *)hMessage; + + if (hMsgQ->q_type == eQTypeSender || + hMsgQ->q_type == eQTypeReveiver) { + name = hMsgQ->q_name; + } + } + + return name; +} + +int GetQueueFD(HANDLE hMessage) { + int fd = -1; + + if (mqCheckValidHandle(hMessage)) { + SQhandle *hMsgQ = (SQhandle *)hMessage; + + if (hMsgQ->q_type == eQTypeSender || + hMsgQ->q_type == eQTypeReveiver) { + fd = hMsgQ->fd; + } + } + + return fd; +} -- cgit 1.2.3-korg