diff options
author | takeshi_hoshina <takeshi_hoshina@mail.toyota.co.jp> | 2020-10-27 11:16:21 +0900 |
---|---|---|
committer | takeshi_hoshina <takeshi_hoshina@mail.toyota.co.jp> | 2020-10-27 11:16:21 +0900 |
commit | 947c78887e791596d4a5ec2d1079f8b1a049628b (patch) | |
tree | 3981e88eb8764d7180722f8466f36b756dc005af /otherservice/rpc_library/library/src/rpc_udp.c | |
parent | 706ad73eb02caf8532deaf5d38995bd258725cb8 (diff) |
basesystem 0.1sandbox/ToshikazuOhiwa/basesystem
Diffstat (limited to 'otherservice/rpc_library/library/src/rpc_udp.c')
-rw-r--r-- | otherservice/rpc_library/library/src/rpc_udp.c | 473 |
1 files changed, 473 insertions, 0 deletions
diff --git a/otherservice/rpc_library/library/src/rpc_udp.c b/otherservice/rpc_library/library/src/rpc_udp.c new file mode 100644 index 00000000..d16569dc --- /dev/null +++ b/otherservice/rpc_library/library/src/rpc_udp.c @@ -0,0 +1,473 @@ +/* + * @copyright Copyright (c) 2016-2019 TOYOTA MOTOR CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file rpc_udp.c + * @brief RPC Library Internal Implementation--UDP Communication + * + */ +/** @addtogroup RPClib_in */ +/** @{ */ +#include <stdio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/poll.h> +#include <sys/time.h> +#include <netinet/in.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> // for usleep + +#include <fcntl.h> +#include <sys/un.h> + +#include <sys/inotify.h> + +#include <other_service/rpc.h> +#include "rpc_internal.h" + +static /*inline*/ UINT32 +RpcGetSequenceNumber(RpcThreadInfo *th) { + RPC_THREAD_MUTEX_LOCK(th); + UINT32 ret = th->sequence_number; + (th->sequence_number)++; + if (th->sequence_number == RPC_SEQ_NUM_INVALID) { // LCOV_EXCL_BR_LINE 200: overflow check, but test time is too long + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + th->sequence_number = RPC_SEQ_NUM_START; // LCOV_EXCL_LINE 200: overflow check, but test time is too long + } + RPC_THREAD_MUTEX_UNLOCK(th); + return ret; +} + +/** + */ +int +RpcReadUdpPacket(const RpcIdInfo *id, UINT8 *buf) { + struct sockaddr_un sa; + socklen_t sa_len = sizeof(sa); + // sa passed to recvfrom does not require initialization + + for(;;) { + + ssize_t ret = recvfrom(RPC_my_sock(id), buf, RPC_UDP_PACKET_SIZE, + 0, (struct sockaddr *)&sa, &sa_len); + + if (ret < 0) { + if (errno == EAGAIN) { // LCOV_EXCL_BR_LINE 5: fail safe for libc recvfrom + //RPC_LOG_PERROR("recvfrom port %d", RPC_port(id)); + return 0; + } else if (errno == EINTR) { // LCOV_EXCL_START 5: fail safe for libc recvfrom + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + continue; + } else { + RPC_LOG_PERROR("recvfrom port %d", RPC_port(id)); + return -1; + } // LCOV_EXCL_STOP + } else if (ret == 0) { + RPC_LOG_STATE("*** recvfrom ret 0"); + return 0; + } else { + return (int)ret; + } + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + } // LCOV_EXCL_LINE 10: final line +} + +/** Number of sendto retries */ +#define RPC_SENDTO_RETRY 5 + +RUNS_IN_CALLERS_THREAD +RUNS_IN_READING_THREAD +/** + */ +static int +RpcSendUdpPacket(RpcIdInfo *id, + struct sockaddr_un *to, int do_retry, + void *mesg, unsigned int bytes) { + int nretry = 0; + ssize_t ret; + int myerr; + retry: + ret = sendto(RPC_my_sock(id), mesg, bytes, 0, + (struct sockaddr *)to, RPC_SOCKET_ADDR_LEN); + myerr = errno; + + if (ret < 0) { + RPC_LOG_STATE("*** sendto %s ***", to->sun_path + 1); + if (myerr == EAGAIN || (do_retry && myerr==ECONNREFUSED && ++nretry <= RPC_SENDTO_RETRY)) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint) + usleep(100000); +#if defined(RPC_USE_UNIX_AUTOBIND) + RPC_LOG_DEBUG("*** sendto %s ***", to->sun_path + 1); +#else /* AUTOBIND */ + RPC_LOG_DEBUG("**************** sendto retry *********************"); +#endif /* !AUTOBIND */ + goto retry; + } + errno = myerr; +#if defined(RPC_USE_UNIX_AUTOBIND) + if (do_retry) { + RPC_LOG_PERROR("sendto %s", to->sun_path + 1); + } +#endif /* !AUTOBIND */ + return -1; + } else if ((unsigned int)ret != bytes) { + RPC_LOG_STATE("can't send all"); + return -1; + } + return 0; +} + +RUNS_IN_CALLERS_THREAD +RUNS_IN_READING_THREAD +/** + */ +int +RpcSendUdp(RpcIdInfo *id, const RPC_ID receiver, int direction, RPC_packet_type type, const void *mesg, unsigned int bytes) { // LCOV_EXCL_START 8: dead code + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + rpc_send_buf sendbuf; + sendbuf.buf = mesg; + sendbuf.bytes = bytes; + + return RpcSendUdp2(id, receiver, direction, type, 1, &sendbuf); +} // LCOV_EXCL_STOP +RUNS_IN_CALLERS_THREAD +RUNS_IN_READING_THREAD +/** UDP packet transmission process + * + * UDP packet format + * @verbatim ++--------------+--------------+ +|*1|*2|*3|*4|*5| PAYLOAD | ++--------------+--------------+ +@endverbatim + * + * *1 to *5 indicate the header. @see RPC_PACKET_HEADER + * + * - *1: [@%-2d] Type of packet + * - @see RPC_packet_type + * - *2: [@%-5x] Source RPC_ID + * - *3: [@%-8x] Sequence number + * - Set the value incremented from 1. This value must be unique for each source thread. + * - In the case of response packets, this field contains the sequence number of the packet from which the response was received. + * - *4: [@%-4u] Size of send data + * - In the case of sending data consists of single packet in PAYLOAD, this field contains number of the packet. + * - For a multipacket, the first packet contains the sum of the PAYLOAD of all packets, + * the last packet contains the PAYLOAD bytes of the packet. + * All other intermediate packets are filled with 0.(With up to two packets in the current constraint, + * there are no intermediate packets.) + * - *5: [@%1d] Position of packet + * - @see rpc_packet_position + */ +int +RpcSendUdp2(struct RpcIdInfo *id, RPC_ID receiver, int direction, + RPC_packet_type type, unsigned int num, rpc_send_buf *sendbuf) { + unsigned char buf[RPC_UDP_PACKET_SIZE]; + UINT32 seq_num = RpcGetSequenceNumber(id->thread_info); + unsigned int bytes = 0; + rpc_send_buf *sendbufp = sendbuf; + int i, do_retry = 1; + for(i = 0 ; i < num ; i++) { + bytes += sendbufp->bytes; + sendbufp++; + } + rpc_assert(bytes <= RPC_UDP_PAYLOAD); // LCOV_EXCL_BR_LINE 6: double check + + struct sockaddr_un to; + memset(&to, 0, sizeof(to)); + to.sun_family = AF_UNIX; +#if defined(RPC_USE_UNIX_AUTOBIND) + if (direction != RPC_SEND_TO_CLIENT) { + RpcSetServerName(to.sun_path, receiver); + if (direction == RPC_SEND_TO_SERVER_NO_RETRY) { + do_retry = 0; + } + } else { + RpcSetClientName(to.sun_path, receiver); + } +#else /* !AUTOBIND */ + rpc_set_socket_name(to.sun_path, rpc_get_port(receiver)); +#endif /* !AUTOBIND */ + + sprintf((char *)buf, RPC_PACKET_HEADER, + (int)type, RPC_my_id(id), seq_num, bytes, + RPC_PACKET_POS_ONEANDONLY); + + unsigned char *bufp = buf + RPC_PACKET_HEADER_LEN; + for(i = 0 ; i < num ; i++) { + memcpy(bufp, sendbuf->buf, sendbuf->bytes); + bufp += sendbuf->bytes; + sendbuf++; + } + if (RpcSendUdpPacket(id, &to, do_retry, + buf, RPC_PACKET_HEADER_LEN + bytes) < 0) { + return -1; + } + return (int)seq_num; +} + +RUNS_IN_READING_THREAD +/** + */ +RPC_Result +RpcSendUdpResponse(RpcIdInfo *id, const RPC_ID receiver, int direction, + RPC_packet_type type, UINT32 seq_num, + char *mesg, UINT32 bytes) { + rpc_assert(bytes <= RPC_MAX_RESPONSE_MESSAGE_SIZE); // LCOV_EXCL_BR_LINE 6: double check + char buf[RPC_PACKET_HEADER_LEN + RPC_MAX_RESPONSE_MESSAGE_SIZE]; + sprintf(buf, RPC_PACKET_HEADER, + (int)type, RPC_my_id(id), seq_num, bytes, + (int)RPC_PACKET_POS_ONEANDONLY); + memcpy(buf + RPC_PACKET_HEADER_LEN, mesg, bytes); + + struct sockaddr_un sa; + memset(&sa, 0, sizeof(sa)); + sa.sun_family = AF_UNIX; +#if defined(RPC_USE_UNIX_AUTOBIND) + if (direction != RPC_SEND_TO_CLIENT) { + RpcSetServerName(sa.sun_path, receiver); + } else { + RpcSetClientName(sa.sun_path, receiver); + } +#else /* !AUTOBIND */ + rpc_set_socket_name(sa.sun_path, rpc_get_port(receiver)); +#endif /* !AUTOBIND */ + return RpcSendUdpPacket(id, &sa, 0, buf, RPC_PACKET_HEADER_LEN + bytes); +} + +/** + */ +RPC_Result +RpcParsePacketHeader(const char *str, RPC_packet_type *command, + RPC_ID_p id, UINT32 *seq_num, UINT32 *size) { + // LCOV_EXCL_BR_START 6: double check + rpc_assert(str != NULL && command != NULL && id != NULL + && seq_num != NULL && size != NULL); + // LCOV_EXCL_BR_STOP + + if (sscanf(str, RPC_PACKET_HEADER_scanf, (int *)command, id, seq_num, size) != 4) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint) + *command = RPC_PACKET_NONE; // LCOV_EXCL_START 5: fail safe for libc sscanf + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + BUG_ASSERT(0, "Parsing packet failed"); + // LCOV_EXCL_STOP + return RPC_ERR_Fatal; + } + return RPC_OK; +} + +#include <sys/time.h> + +RPC_Result +RpcClientWaitResponse(RpcIdInfo *idinfo, UINT32 seq_num, + UINT32 timeout_msec, UINT16 *response) { + unsigned char readbuf[RPC_UDP_PACKET_SIZE]; + fd_set fds; + int fd = idinfo->sock; + + struct timeval timeout; + timeout.tv_sec = (__time_t)(timeout_msec / 1000); + timeout.tv_usec = (__suseconds_t)((timeout_msec % 1000) * 1000); + + *response = RPC_RESPONSE_NONE; + + for(;;) { + FD_ZERO(&fds); + FD_SET(fd, &fds); + int sret = select(fd + 1, &fds, NULL, NULL, &timeout); + if (sret < 0 && errno == EINTR) { /* signal interrupt */ // LCOV_EXCL_START 5: fail safe for libc select + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + continue; + // LCOV_EXCL_STOP + } else if (sret == 0) { /* timeout */ + RPC_LOG_ERR("server response timeout"); + return RPC_ERR_No_Response; + } else if (sret > 0 && FD_ISSET(fd, &fds)) { // LCOV_EXCL_BR_LINE 5: fail safe for libc select + RPC_ID sender; + UINT32 seq; + UINT32 size; + RPC_packet_type command; + int readret = RpcReadUdpPacket(idinfo, readbuf); + if (readret <= 0) { // LCOV_EXCL_START 5: fail safe for libc recvfrom + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + /* debug code to see socket status */ + /* why recvfrom() returned 0 ? */ + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN|POLLHUP|POLLERR; + sret = poll(&pfd, 1, 0/* timeout 0 */); + RPC_LOG_STATE("** poll revents=%x", pfd.revents); + return RPC_ERR_Fatal; + } // LCOV_EXCL_STOP + if (RpcParsePacketHeader((const char *)readbuf, &command, &sender, &seq, &size) != RPC_OK) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint) + return RPC_ERR_Fatal; + } + unsigned char c; + if (seq == seq_num) { + switch(command) { + case RPC_RESPONSE_APICALL: + c = readbuf[RPC_PACKET_HEADER_LEN]; + switch(c) { + case 'O': + *response = RPC_RESPONSE_API_OK; + goto exit_loop_ok; + break; + case 'B': + *response = RPC_RESPONSE_API_BUSY; + goto exit_loop_ok; + break; + case 'E': + *response = RPC_RESPONSE_API_ERR; + goto exit_loop_ok; + break; + case 'D': + *response = RPC_RESPONSE_API_DEADLOCK; + goto exit_loop_ok; + break; + case 'C': + *response = RPC_RESPONSE_API_CERTIFY; + goto exit_loop_ok; + break; + default: + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + BUG_ASSERT(0, "illegal response\n"); // LCOV_EXCL_LINE 15: marco defined in rpc_internal.h + return RPC_ERR_Fatal; + break; + } + break; + + default: + RPC_LOG_STATE("Unknown packet command=%d", command); + return RPC_ERR_Fatal; + break; + } + } else { /* sequence number mismatch == response to other request */ + RPC_LOG_DEBUG("unwanted response received(delayed response?)"); + continue; + } + } else { /* poll error */ + RPC_LOG_PERROR("select in wait response"); + return RPC_ERR_Fatal; + } + } + exit_loop_ok: + return RPC_OK; +} + +RPC_Result +RpcClientWaitResult(RpcIdInfo *idinfo, RPC_ID srvr_id) { + unsigned char readbuf[RPC_UDP_PACKET_SIZE]; + fd_set fds; + int fd = idinfo->sock; + int inotify_fd = RPC_clnt_inotify_fd(idinfo); + int maxfd; + RPC_Result result = RPC_OK; + + for(;;) { + FD_ZERO(&fds); + FD_SET(fd, &fds); + FD_SET(inotify_fd, &fds); + + /* Determine the maximum value of fd to wait */ + if (fd > inotify_fd) { + maxfd = fd; + } else { + maxfd = inotify_fd; + } + + int sret = select(maxfd + 1, &fds, NULL, NULL, NULL); + if (sret < 0 && errno == EINTR) { /* signal interrupt */ // LCOV_EXCL_START 5: fail safe for libc select + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + continue; + // LCOV_EXCL_STOP + } else if (sret > 0 && FD_ISSET(fd, &fds)) { /* success */ // LCOV_EXCL_BR_LINE 5: fail safe for libc select + RPC_ID sender; + UINT32 seq; + UINT32 size; + RPC_packet_type command; + int readret = RpcReadUdpPacket(idinfo, readbuf); + if (readret <= 0) { // LCOV_EXCL_START 5: fail safe for libc recvfrom + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + rpc_assert(readret >= 0); + result = RPC_ERR_Fatal; + goto exit_loop_ok; + } // LCOV_EXCL_STOP + if (RpcParsePacketHeader((const char *)readbuf, &command, &sender, &seq, &size) != RPC_OK) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint) + // LCOV_EXCL_START 5: fail safe for libc sscanf + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + result = RPC_ERR_Fatal; + goto exit_loop_ok; + } // LCOV_EXCL_STOP + switch(command) { + case RPC_PACKET_APIRETURN: + result = RpcSetAPIcallReturn(idinfo, + (const char *)(readbuf + RPC_PACKET_HEADER_LEN), + size); + if(result == RPC_OK) { // LCOV_EXCL_BR_LINE 5: fail safe for libc malloc + if (sscanf((const char *)(readbuf + RPC_PACKET_HEADER_LEN), "%08x ", (unsigned int *)&result) != 1) { // LCOV_EXCL_BR_LINE 5: fail safe for libc sscanf + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + BUG_ASSERT(0, "Parsing packet failed"); // LCOV_EXCL_LINE 15: marco defined in rpc_internal.h + return RPC_ERR_Fatal; + } + } + goto exit_loop_ok; + default: + RPC_LOG_STATE("unwanted packet received while waiting for API return"); + continue; + break; + } + } else if (sret > 0 && FD_ISSET(inotify_fd, &fds)) { /* server process is terminate. */ + UINT32 read_len = 0; + int length = 0; + char *buffer; + buffer = (char *)rpc_malloc(BUF_LEN); + if (NULL == buffer) { // LCOV_EXCL_START 5: fail safe for libc malloc + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + RPC_LOG_ERR("rpc_malloc() ERROR."); + } // LCOV_EXCL_STOP + + if((length = (int)read( inotify_fd, buffer, BUF_LEN )) < 0 ) { // LCOV_EXCL_START 5: fail safe for libc read + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + RPC_LOG_PERROR("inotify read() ERROR."); + } // LCOV_EXCL_STOP + while (read_len < length) { + struct inotify_event *event = ( struct inotify_event * )&buffer[read_len]; + + if (event->mask & IN_DELETE_SELF) { /* Terminating a Server Process */ + if (RPC_ERR_Server_Finish == RpcDeleteSrvrPid(idinfo, srvr_id, event->wd)) { + RPC_LOG_PERROR("server process is terminate. : srvr_ID = %x", srvr_id); + result = RPC_ERR_Fatal; + } + } + read_len += (UINT32)(EVENT_SIZE + event->len); /* Size of the inotify_event structure */ + } + rpc_free(buffer); + if (RPC_OK != result) { + goto exit_loop_ok; + } + + } else { /* poll error */ // LCOV_EXCL_START 5: fail safe for libc select + AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert + RPC_LOG_PERROR("select in wait result"); + result = RPC_ERR_Fatal; + goto exit_loop_ok; + // LCOV_EXCL_STOP + } + } +exit_loop_ok: + + return result; +} + +/** @} */ |