#include #include #include #include #include #include #define RECV_SLOTS 32000 #define BUF_SIZE 1460 static struct io_uring ring = {}; static unsigned long long num_packets_received; static unsigned long long last_num_packets_received; static unsigned long long num_bytes_received; static bool started; static struct timespec ts_start; static int rx_fd; struct recv_slot { struct iovec iov; uint8_t buf[BUF_SIZE]; struct msghdr msgh; }; static struct recv_slot recv_slots[RECV_SLOTS]; static void prepare_rx(struct recv_slot *s) { s->iov.iov_base = s->buf; s->iov.iov_len = sizeof(s->buf); s->msgh = (struct msghdr){ .msg_iov = &s->iov, .msg_iovlen = 1, }; } static void submit_rx(struct recv_slot *s) { struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(&ring); OSMO_ASSERT(sqe); io_uring_prep_recvmsg(sqe, rx_fd, &s->msgh, 0); io_uring_sqe_set_data(sqe, s); } void handle_completion(struct io_uring_cqe *cqe) { struct recv_slot *s; s = io_uring_cqe_get_data(cqe); if (OSMO_UNLIKELY(!started)) { started = true; OSMO_ASSERT(clock_gettime(CLOCK_MONOTONIC, &ts_start) == 0); } if (cqe->res <= 0) { printf("rc = %d\n", cqe->res); return; } num_bytes_received += cqe->res; num_packets_received++; io_uring_cqe_seen(&ring, cqe); /* submit more */ submit_rx(s); } static void log_rx_force(struct timespec *ts_now) { struct timespec ts_elapsed; timespecsub(ts_now, &ts_start, &ts_elapsed); unsigned long long elapsed_usec = (ts_elapsed.tv_sec * 1000 * 1000) + (ts_elapsed.tv_nsec / 1000); if (elapsed_usec == 0) elapsed_usec = 1; unsigned long long kpkts_per_sec = num_packets_received * 1000 / elapsed_usec; unsigned long long mbps = num_bytes_received * 8 / elapsed_usec; printf("%16llu RX: %8llu packets %16llu bytes %16llu kPPS %16llu Mbps\n", elapsed_usec, num_packets_received, num_bytes_received, kpkts_per_sec, mbps); fflush(stdout); } static void log_rx(void) { static struct timespec last_info_log = {.tv_sec = 0, .tv_nsec = 0}; struct timespec ts_now; clock_gettime(CLOCK_MONOTONIC, &ts_now); if (OSMO_UNLIKELY(ts_now.tv_sec != last_info_log.tv_sec)) { last_info_log = ts_now; log_rx_force(&ts_now); } } int main(int argc, const char **argv) { int i; int rc; const char *local_addr_str = "0.0.0.0"; uint16_t local_port = 23000; struct osmo_sockaddr_str local_addr = {}; struct osmo_sockaddr local_osa = {}; struct __kernel_timespec ts_zero = {}; struct __kernel_timespec ts_1s = { .tv_sec = 1 }; if (argc >= 2) local_addr_str = argv[1]; if (argc >= 3) local_port = atoi(argv[2]); if (osmo_sockaddr_str_from_str(&local_addr, local_addr_str, local_port) || osmo_sockaddr_str_to_osa(&local_addr, &local_osa)) { printf("ERROR: invalid address or port number: %s:%d\n", local_addr_str, local_port); return -1; } /* create and bind socket */ rc = osmo_sock_init_osa(SOCK_DGRAM, IPPROTO_UDP, &local_osa, NULL, OSMO_SOCK_F_BIND); if (rc < 0) return -1; rx_fd = rc; printf("bound UDP %s fd=%d\n", osmo_sock_get_name2(rx_fd), rx_fd); rc = io_uring_queue_init(ARRAY_SIZE(recv_slots), &ring, 0); /* Prepare */ for (i = 0; i < ARRAY_SIZE(recv_slots); i++) prepare_rx(&recv_slots[i]); /* fill up tx queue */ for (i = 0; i < ARRAY_SIZE(recv_slots); i++) submit_rx(&recv_slots[i]); while (1) { uint32_t new_submissions; uint32_t new_completions = 0; struct io_uring_cqe *cqe; /* submit any requests from previous loop */ new_submissions = io_uring_submit(&ring); /* process all pending completions */ while (io_uring_wait_cqe_timeout(&ring, &cqe, &ts_zero) == 0) { handle_completion(cqe); new_completions++; } /* Nothing happened in this loop iteration, so wait a bit longer */ if (!new_submissions && !new_completions) { if (io_uring_wait_cqe_timeout(&ring, &cqe, &ts_1s) == 0) { handle_completion(cqe); new_completions++; } } if (OSMO_UNLIKELY(!started)) continue; if (OSMO_UNLIKELY(last_num_packets_received == num_packets_received)) { #if 0 printf("FINISHED!\n"); struct timespec ts_now; clock_gettime(CLOCK_MONOTONIC, &ts_now); log_rx_force(&ts_now); started = false; last_num_packets_received = 0; #endif continue; } last_num_packets_received = num_packets_received; log_rx(); } printf("done\n"); return 0; }