From d18f9ca75a59c93431d7ed5b4e5e2eaafa291a42 Mon Sep 17 00:00:00 2001 From: Miroslav Lichvar Date: Mon, 1 Aug 2016 17:25:43 +0200 Subject: [PATCH] ntp: rework receiving messages Allocate buffers for received messages on heap instead of stack and prepare the code for receiving multiple messages at the same time. --- ntp_io.c | 207 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 132 insertions(+), 75 deletions(-) diff --git a/ntp_io.c b/ntp_io.c index 2492272..d263e29 100644 --- a/ntp_io.c +++ b/ntp_io.c @@ -30,6 +30,7 @@ #include "sysincl.h" +#include "array.h" #include "ntp_io.h" #include "ntp_core.h" #include "ntp_sources.h" @@ -51,6 +52,25 @@ union sockaddr_in46 { struct sockaddr u; }; +struct Message { + union sockaddr_in46 name; + struct iovec iov; + NTP_Receive_Buffer buf; + /* Aligned buffer for control messages */ + struct cmsghdr cmsgbuf[CMSGBUF_SIZE / sizeof (struct cmsghdr)]; +}; + +struct MessageHeader { + struct msghdr msg_hdr; + unsigned int msg_len; +}; + +#define MAX_RECV_MESSAGES 1 + +/* Arrays of Message and MessageHeader */ +static ARR_Instance recv_messages; +static ARR_Instance recv_headers; + /* The server/peer and client sockets for IPv4 and IPv6 */ static int server_sock_fd4; static int client_sock_fd4; @@ -288,6 +308,33 @@ close_socket(int sock_fd) } /* ================================================== */ + +static void +prepare_buffers(unsigned int n) +{ + struct MessageHeader *hdr; + struct Message *msg; + unsigned int i; + + for (i = 0; i < n; i++) { + msg = ARR_GetElement(recv_messages, i); + hdr = ARR_GetElement(recv_headers, i); + + msg->iov.iov_base = &msg->buf; + msg->iov.iov_len = sizeof (msg->buf); + hdr->msg_hdr.msg_name = &msg->name; + hdr->msg_hdr.msg_namelen = sizeof (msg->name); + hdr->msg_hdr.msg_iov = &msg->iov; + hdr->msg_hdr.msg_iovlen = 1; + hdr->msg_hdr.msg_control = &msg->cmsgbuf; + hdr->msg_hdr.msg_controllen = sizeof (msg->cmsgbuf); + hdr->msg_hdr.msg_flags = 0; + hdr->msg_len = 0; + } +} + +/* ================================================== */ + void NIO_Initialise(int family) { @@ -296,6 +343,12 @@ NIO_Initialise(int family) assert(!initialised); initialised = 1; + recv_messages = ARR_CreateInstance(sizeof (struct Message)); + ARR_SetSize(recv_messages, MAX_RECV_MESSAGES); + recv_headers = ARR_CreateInstance(sizeof (struct MessageHeader)); + ARR_SetSize(recv_headers, MAX_RECV_MESSAGES); + prepare_buffers(MAX_RECV_MESSAGES); + server_port = CNF_GetNTPPort(); client_port = CNF_GetAcquisitionPort(); @@ -368,6 +421,8 @@ NIO_Finalise(void) close_socket(server_sock_fd6); server_sock_fd6 = client_sock_fd6 = INVALID_SOCK_FD; #endif + ARR_DestroyInstance(recv_headers); + ARR_DestroyInstance(recv_messages); initialised = 0; } @@ -483,103 +538,105 @@ NIO_IsServerSocket(int sock_fd) /* ================================================== */ static void -read_from_socket(int sock_fd, int event, void *anything) +process_receive(struct msghdr *hdr, int length, int sock_fd) { - /* This should only be called when there is something - to read, otherwise it will block. */ - - int status; - NTP_Receive_Buffer message; - union sockaddr_in46 where_from; - unsigned int flags = 0; - struct timeval now; - double now_err; NTP_Remote_Address remote_addr; NTP_Local_Address local_addr; - struct cmsghdr cmsgbuf[CMSGBUF_SIZE / sizeof (struct cmsghdr)]; - struct msghdr msg; - struct iovec iov; struct cmsghdr *cmsg; - - assert(initialised); + struct timeval now; + double now_err; SCH_GetLastEventTime(&now, &now_err, NULL); - iov.iov_base = &message.ntp_pkt; - iov.iov_len = sizeof(message); - msg.msg_name = &where_from; - msg.msg_namelen = sizeof(where_from); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_control = (void *) cmsgbuf; - msg.msg_controllen = sizeof(cmsgbuf); - msg.msg_flags = 0; + if (hdr->msg_namelen > sizeof (union sockaddr_in46)) { + DEBUG_LOG(LOGF_NtpIO, "Truncated source address"); + return; + } - status = recvmsg(sock_fd, &msg, flags); + UTI_SockaddrToIPAndPort((struct sockaddr *)hdr->msg_name, + &remote_addr.ip_addr, &remote_addr.port); - /* Don't bother checking if read failed or why if it did. More - likely than not, it will be connection refused, resulting from a - previous sendto() directing a datagram at a port that is not - listening (which appears to generate an ICMP response, and on - some architectures e.g. Linux this is translated into an error - reponse on a subsequent recvfrom). */ + local_addr.ip_addr.family = IPADDR_UNSPEC; + local_addr.sock_fd = sock_fd; - if (status > 0) { - if (msg.msg_namelen > sizeof (where_from)) - LOG_FATAL(LOGF_NtpIO, "Truncated source address"); - - UTI_SockaddrToIPAndPort(&where_from.u, &remote_addr.ip_addr, &remote_addr.port); - - local_addr.ip_addr.family = IPADDR_UNSPEC; - local_addr.sock_fd = sock_fd; - - for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + for (cmsg = CMSG_FIRSTHDR(hdr); cmsg; cmsg = CMSG_NXTHDR(hdr, cmsg)) { #ifdef HAVE_IN_PKTINFO - if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { - struct in_pktinfo ipi; + if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { + struct in_pktinfo ipi; - memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi)); - local_addr.ip_addr.addr.in4 = ntohl(ipi.ipi_spec_dst.s_addr); - local_addr.ip_addr.family = IPADDR_INET4; - } + memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi)); + local_addr.ip_addr.addr.in4 = ntohl(ipi.ipi_spec_dst.s_addr); + local_addr.ip_addr.family = IPADDR_INET4; + } #endif #ifdef HAVE_IN6_PKTINFO - if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) { - struct in6_pktinfo ipi; + if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) { + struct in6_pktinfo ipi; - memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi)); - memcpy(&local_addr.ip_addr.addr.in6, &ipi.ipi6_addr.s6_addr, - sizeof (local_addr.ip_addr.addr.in6)); - local_addr.ip_addr.family = IPADDR_INET6; - } + memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi)); + memcpy(&local_addr.ip_addr.addr.in6, &ipi.ipi6_addr.s6_addr, + sizeof (local_addr.ip_addr.addr.in6)); + local_addr.ip_addr.family = IPADDR_INET6; + } #endif #ifdef SO_TIMESTAMP - if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) { - struct timeval tv; + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) { + struct timeval tv; - memcpy(&tv, CMSG_DATA(cmsg), sizeof(tv)); - LCL_CookTime(&tv, &now, &now_err); - } + memcpy(&tv, CMSG_DATA(cmsg), sizeof(tv)); + LCL_CookTime(&tv, &now, &now_err); + } #endif - } - - DEBUG_LOG(LOGF_NtpIO, "Received %d bytes from %s:%d to %s fd %d", - status, UTI_IPToString(&remote_addr.ip_addr), remote_addr.port, - UTI_IPToString(&local_addr.ip_addr), local_addr.sock_fd); - - if (status >= NTP_NORMAL_PACKET_LENGTH) { - - NSR_ProcessReceive((NTP_Packet *) &message.ntp_pkt, &now, now_err, - &remote_addr, &local_addr, status); - - } else { - - /* Just ignore the packet if it's not of a recognized length */ - - } } + + DEBUG_LOG(LOGF_NtpIO, "Received %d bytes from %s:%d to %s fd %d", + length, UTI_IPToString(&remote_addr.ip_addr), remote_addr.port, + UTI_IPToString(&local_addr.ip_addr), local_addr.sock_fd); + + /* Just ignore the packet if it's not of a recognized length */ + if (length < NTP_NORMAL_PACKET_LENGTH || length > sizeof (NTP_Receive_Buffer)) + return; + + NSR_ProcessReceive((NTP_Packet *)hdr->msg_iov[0].iov_base, &now, now_err, + &remote_addr, &local_addr, length); +} + +/* ================================================== */ + +static void +read_from_socket(int sock_fd, int event, void *anything) +{ + /* This should only be called when there is something + to read, otherwise it will block */ + + struct MessageHeader *hdr; + unsigned int i, n; + int status; + + hdr = ARR_GetElements(recv_headers); + n = ARR_GetSize(recv_headers); + assert(n >= 1); + + n = 1; + status = recvmsg(sock_fd, &hdr[0].msg_hdr, 0); + if (status >= 0) + hdr[0].msg_len = status; + + if (status < 0) { + DEBUG_LOG(LOGF_NtpIO, "Could not receive from fd %d : %s", sock_fd, + strerror(errno)); + return; + } + + for (i = 0; i < n; i++) { + hdr = ARR_GetElement(recv_headers, i); + process_receive(&hdr->msg_hdr, hdr->msg_len, sock_fd); + } + + /* Restore the buffers to their original state */ + prepare_buffers(n); } /* ================================================== */