/* UDP responder: listen on a UDP port, and respond to each received UDP packet back to the sender. */ /* * (C) 2024 by sysmocom - s.f.m.c. GmbH * All Rights Reserved. * * Author: Neels Janosch Hofmeyr * Author: Pau Espin Pedrol * * SPDX-License-Identifier: GPL-2.0+ * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ #include "config.h" #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include #include #include #if HAVE_URING #include #include #include #include #include #include #include #include #include #include #include #include static volatile bool started; static struct timespec ts_start; struct cmdline_cmd { const char *short_option; const char *long_option; const char *arg_name; const char *doc; const char *value; }; #define cmdline_foreach(ITER, CMDS) \ for (const struct cmdline_cmd *ITER = (CMDS); \ ITER->short_option || ITER->long_option || ITER->arg_name; \ ITER++) int cmdline_doc_str_buf(char *buf, size_t buflen, const struct cmdline_cmd *cmds) { struct osmo_strbuf sb = { .buf = buf, .len = buflen }; /* First find the longest options part */ int w = 0; cmdline_foreach(cmd, cmds) { int cmd_w = 0; if (cmd->short_option) cmd_w += 2 + strlen(cmd->short_option); if (cmd->long_option) cmd_w += 3 + strlen(cmd->long_option); if (cmd->arg_name) cmd_w += 1 + strlen(cmd->arg_name); w = OSMO_MAX(w, cmd_w); } /* vertical gap */ w += 2; OSMO_STRBUF_PRINTF(sb, "Options:\n"); cmdline_foreach(cmd, cmds) { char *line_start = sb.pos; if (cmd->short_option) OSMO_STRBUF_PRINTF(sb, " -%s", cmd->short_option); if (cmd->long_option) OSMO_STRBUF_PRINTF(sb, " --%s", cmd->long_option); if (cmd->arg_name) OSMO_STRBUF_PRINTF(sb, " %s", cmd->arg_name); if (cmd->doc) { int have = sb.pos - line_start; int spaces = OSMO_MAX(1, w - have); OSMO_STRBUF_PRINTF(sb, "%*s", spaces, ""); OSMO_STRBUF_PRINTF(sb, "%s", cmd->doc); } OSMO_STRBUF_PRINTF(sb, "\n"); } return sb.chars_needed; } void cmdline_print_help(const struct cmdline_cmd *cmds) { char buf[8192]; cmdline_doc_str_buf(buf, sizeof(buf), cmds); printf("%s", buf); } void cmdline_cmd_store_optarg(struct cmdline_cmd *cmd) { if (cmd->arg_name) cmd->value = optarg; else cmd->value = (cmd->short_option ? : cmd->long_option); } int cmdline_read(struct cmdline_cmd *cmds, int argc, char **argv) { char short_options[256] = {}; struct option long_options[128] = {}; int long_options_i = 0; int long_option_val = 0; struct osmo_strbuf short_sb = { .buf = short_options, .len = sizeof(short_options) }; cmdline_foreach(cmd, cmds) { if (cmd->short_option) { OSMO_STRBUF_PRINTF(short_sb, "%s", cmd->short_option); if (cmd->arg_name) OSMO_STRBUF_PRINTF(short_sb, ":"); } if (cmd->long_option) { long_options[long_options_i] = (struct option){ cmd->long_option, cmd->arg_name ? 1 : 0, &long_option_val, long_options_i, }; long_options_i++; } } while (1) { int option_index = 0; char c = getopt_long(argc, argv, short_options, long_options, &option_index); if (c == -1) break; if (c == 0) { struct cmdline_cmd *long_cmd = &cmds[long_option_val]; cmdline_cmd_store_optarg(long_cmd); } else { bool found = false; cmdline_foreach(cc, cmds) { if (strchr(cc->short_option, c)) { cmdline_cmd_store_optarg((struct cmdline_cmd *)cc); found = true; break; } } if (!found) { fprintf(stderr, "%s: Error in command line options. Exiting.\n", argv[0]); return -1; } } } /* positional args */ cmdline_foreach(cmd, cmds) { if (optind >= argc) break; if (cmd->short_option || cmd->long_option) continue; if (!cmd->arg_name) continue; ((struct cmdline_cmd *)cmd)->value = argv[optind]; optind++; } if (optind < argc) { cmdline_print_help(cmds); fprintf(stderr, "%s: Unsupported positional argument on command line\n", argv[optind]); return -1; } return 0; } const char *cmdline_get(const struct cmdline_cmd *cmds, const char *option_name, const char *default_val) { cmdline_foreach(cmd, cmds) { if (cmd->long_option && !strcmp(cmd->long_option, option_name)) return cmd->value; if (cmd->short_option && !strcmp(cmd->short_option, option_name)) return cmd->value; if (cmd->arg_name && !strcmp(cmd->arg_name, option_name)) return cmd->value; } return default_val; } bool cmdline_get_int(int *dst, int minval, int maxval, int default_val, const struct cmdline_cmd *cmds, const char *option_name) { const char *str = cmdline_get(cmds, option_name, NULL); if (!str) { *dst = default_val; return true; } if (osmo_str_to_int(dst, str, 10, minval, maxval)) { cmdline_print_help(cmds); printf("ERROR: invalid integer number: %s\n", str); return false; } if (*dst < minval || *dst > maxval) { cmdline_print_help(cmds); printf("ERROR: number out of range: %d <= %d <= %d\n", minval, *dst, maxval); return false; } return true; } /* returns number of configured CPUs in the system, or negative otherwise */ static int get_num_cpus(void) { static unsigned int num_cpus = 0; long ln; if (num_cpus) return num_cpus; /* This is expensive (goes across /sys, so let's do it only once. It is * guaranteed it won't change during process span anyway). */ ln = sysconf(_SC_NPROCESSORS_CONF); if (ln < 0) { fprintf(stderr, "sysconf(_SC_NPROCESSORS_CONF) failed: %s\n", strerror(errno)); return -1; } num_cpus = (unsigned int) ln; return num_cpus; } struct udp_port { struct llist_head entry; /* IP address and UDP port from user input */ struct osmo_sockaddr osa; /* locally bound socket */ int fd; }; enum data_io_type { IO_UNUSED = 0, IO_RECV, IO_SEND, }; struct data_io { enum data_io_type type; struct osmo_sockaddr osa; struct iovec iov; struct msghdr msgh; uint8_t *data; size_t data_size; uint8_t *control; size_t control_size; int n; }; struct io_queue { size_t d_size; struct data_io d[0]; }; struct counter { uint64_t count; uint64_t last; }; struct traffic_counter { struct counter packets; struct counter bytes; }; uint64_t counter_get(struct counter *c) { uint64_t cnt = c->count; uint64_t val = cnt - c->last; c->last = cnt; return val; } struct traffic_counter g_rx = {}; struct traffic_counter g_tx = {}; struct worker { int id; struct io_queue *q; struct io_uring ring; pthread_t worker; struct { unsigned long long rx_packets; unsigned long long rx_bytes; unsigned long long tx_packets; unsigned long long tx_bytes; } ctr; }; struct { int port_nr; const char *local_addr; int queue_size; int buf_size; int control_size; int response_size; int response_n; int workers_n; struct udp_port port; bool cpu_affinity; } cfg = {}; static void data_io_prepare(struct worker *w, struct data_io *d) { void *data_buf = talloc_size(w->q, cfg.buf_size); void *control_buf = talloc_size(w->q, cfg.control_size); *d = (struct data_io){ .type = IO_RECV, .iov = { .iov_base = data_buf, .iov_len = cfg.buf_size, }, .msgh = { .msg_name = &d->osa, .msg_namelen = sizeof(d->osa), .msg_iov = &d->iov, .msg_iovlen = 1, .msg_control = control_buf, .msg_controllen = cfg.control_size, }, .data = data_buf, .data_size = cfg.buf_size, .control = control_buf, .control_size = cfg.control_size, }; } static void data_io_prep_recv(struct io_uring *ring, struct udp_port *port, struct data_io *d) { struct io_uring_sqe *sqe; d->type = IO_RECV; d->iov.iov_len = d->data_size; d->msgh.msg_controllen = d->control_size; sqe = io_uring_get_sqe(ring); OSMO_ASSERT(sqe); io_uring_prep_recvmsg(sqe, port->fd, &d->msgh, 0); io_uring_sqe_set_data(sqe, d); } static void data_io_prep_send(struct io_uring *ring, struct udp_port *port, struct data_io *d) { struct io_uring_sqe *sqe; d->type = IO_SEND; sqe = io_uring_get_sqe(ring); OSMO_ASSERT(sqe); io_uring_prep_sendmsg(sqe, port->fd, &d->msgh, 0); io_uring_sqe_set_data(sqe, d); } static bool get_payload_info(struct gtp_flood_payload_info *dst, struct data_io *d) { uint8_t *pi; uint8_t *len; size_t copy_len; len = d->iov.iov_base + d->iov.iov_len - 1; if ((*len) > d->iov.iov_len) return false; pi = len - (*len); if (strncmp((void *)pi, "info", 4)) return false; copy_len = OSMO_MIN(sizeof(*dst), *len); *dst = (struct gtp_flood_payload_info){}; memcpy((void *)dst, pi, copy_len); return true; } static void data_io_handle_completion(struct worker *w, struct udp_port *port, struct io_uring_cqe *cqe, int response_size, int response_n) { struct data_io *d; struct osmo_sockaddr *osa = NULL; int rc; struct gtp_flood_payload_info pi; struct io_uring *ring = &w->ring; d = io_uring_cqe_get_data(cqe); osa = &d->osa; rc = cqe->res; switch (d->type) { case IO_RECV: /* done reading */ if (OSMO_UNLIKELY(!started)) { started = true; OSMO_ASSERT(clock_gettime(CLOCK_MONOTONIC, &ts_start) == 0); } if (rc <= 0) { LOGP(DLGLOBAL, LOGL_ERROR, "%s -> rx error rc=%d flags=0x%x\n", osa ? osmo_sockaddr_to_str(osa) : "NULL", rc, cqe->flags); return; } #if 0 LOGP(DLGLOBAL, LOGL_DEBUG, "%s -> rx rc=%d flags=0x%x: %s\n", osa ? osmo_sockaddr_to_str(osa) : "NULL", rc, cqe->flags, osmo_quote_str(d->iov.iov_base, rc)); #endif io_uring_cqe_seen(ring, cqe); w->ctr.rx_packets++; w->ctr.rx_bytes += rc; if (response_n < 1) { data_io_prep_recv(ring, port, d); break; } d->iov.iov_len = rc; if (get_payload_info(&pi, d)) { /* set the return TEID */ struct gtp1u_hdr *gtp_hdr = (void *)d->iov.iov_base; gtp_hdr->tei = pi.return_teid; } /* resubmit back to sender */ /* adjust size? */ if (response_size > 0) d->iov.iov_len = response_size; data_io_prep_send(ring, port, d); break; case IO_SEND: /* done writing. */ if (rc <= 0) { LOGP(DLGLOBAL, LOGL_ERROR, "%s -> tx error rc=%d flags=0x%x\n", osa ? osmo_sockaddr_to_str(osa) : "NULL", rc, cqe->flags); return; } #if 0 LOGP(DLGLOBAL, LOGL_DEBUG, "%s <- tx rc=%d flags=0x%x: %s\n", osa ? osmo_sockaddr_to_str(osa) : "NULL", rc, cqe->flags, osmo_quote_str(d->iov.iov_base, rc)); #endif io_uring_cqe_seen(ring, cqe); w->ctr.tx_packets++; w->ctr.tx_bytes += rc; d->n++; /* Send again? If not, re-submit open slot for reading. */ if (d->n < response_n) data_io_prep_send(ring, port, d); else data_io_prep_recv(ring, port, d); break; default: OSMO_ASSERT(0); } } struct cmdline_cmd cmds[] = { { .short_option = "h", .long_option = "help", .doc = "Show this help", }, { .short_option = "l", .long_option = "local-addr", .arg_name = "IP-ADDR", .doc = "Listen on local IP address (default is 0.0.0.0).", }, { .short_option = "p", .long_option = "port", .arg_name = "UDP-PORT", .doc = "Listen on local UDP port.", }, /* { .short_option = "P", .long_option = "port-range-to", .arg_name = "UDP-PORT-TO", .doc = "Listen on a range of ports, from --port to --port-range-to, inclusive.", }, */ { .short_option = "s", .long_option = "response-size", .arg_name = "BYTES", .doc = "When responding, enlarge or shorten the payload to this size.", }, { .short_option = "n", .long_option = "response-repeat", .arg_name = "N", .doc = "Respond N times, i.e. multiply the returned traffic.", }, { .short_option = "a", .long_option = "cpu-affinity", .doc = "Pin each Nth worker to a Nth cpu.", }, { .long_option = "io-uring-queue", .arg_name = "SIZE", .doc = "I/O tuning: queue size to use for io_uring, default is 4000.", }, { .long_option = "io-uring-buf", .arg_name = "SIZE", .doc = "I/O tuning: maximum payload size, default is 1452.", }, { .long_option = "workers", .arg_name = "N", .doc = "Number of rx threads to run", }, {} }; static const struct log_info_cat categories[] = { }; const struct log_info udp_responder_log_info = { .cat = categories, .num_cat = ARRAY_SIZE(categories), }; static void start_rx_worker(struct worker *w); int main(int argc, char **argv) { struct osmo_sockaddr_str addr = {}; struct osmo_sockaddr osa = {}; int val; int rc; osmo_init_logging2(OTC_GLOBAL, &udp_responder_log_info); log_set_log_level(osmo_stderr_target, LOGL_ERROR); if (cmdline_read(cmds, argc, argv) || cmdline_get(cmds, "help", NULL)) { cmdline_print_help(cmds); return -1; } if (!cmdline_get_int(&cfg.port_nr, 1, 65535, 23000, cmds, "port")) return -1; cfg.local_addr = cmdline_get(cmds, "local-addr", "0.0.0.0"); if (osmo_sockaddr_str_from_str(&addr, cfg.local_addr, cfg.port_nr) || osmo_sockaddr_str_to_osa(&addr, &osa)) { printf("ERROR: invalid interface or port number: %s:%d\n", cfg.local_addr, cfg.port_nr); return -1; } if (!cmdline_get_int(&cfg.queue_size, 1, 65535, 4000, cmds, "io-uring-queue")) return -1; if (!cmdline_get_int(&cfg.buf_size, 1, 65535, 1452, cmds, "io-uring-buf")) return -1; if (!cmdline_get_int(&cfg.response_size, 0, cfg.buf_size, 0, cmds, "response-size")) return -1; if (!cmdline_get_int(&cfg.response_n, 0, INT_MAX, 1, cmds, "response-repeat")) return -1; if (!cmdline_get_int(&cfg.workers_n, 1, INT_MAX, 4, cmds, "workers")) return -1; if (cmdline_get(cmds, "cpu-affinity", NULL)) cfg.cpu_affinity = true; cfg.port.osa = osa; cfg.control_size = 1024; /* create and bind socket */ rc = osmo_sock_init_osa(SOCK_DGRAM, IPPROTO_UDP, &cfg.port.osa, NULL, OSMO_SOCK_F_BIND); /* (logging of errors already happens in osmo_sock_init_osa() */ if (rc < 0) return -1; cfg.port.fd = rc; LOGP(DLGLOBAL, LOGL_NOTICE, "bound UDP %s fd=%d\n", osmo_sock_get_name2(cfg.port.fd), cfg.port.fd); /* Set Don't Fragment (DF) bit on IP packets transmitted by socket: */ val = IP_PMTUDISC_DO; rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); if (rc == -1) { fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_DONTFRAG) failed errno=%d\n", errno); return -1; } val = 1; rc = setsockopt(cfg.port.fd, IPPROTO_IP, IP_PKTINFO, &val, sizeof(val)); /* TODO: IPv6: setsockopt(s, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one)); */ if (rc == -1) { fprintf(stderr, "ERROR: setsockopt(IPPROTO_IP, IP_PKTINFO) failed errno=%d\n", errno); return -1; } struct worker *workers = talloc_zero_array(OTC_GLOBAL, struct worker, cfg.workers_n); for (int i = 0; i < cfg.workers_n; i++) { printf("Starting worker %d\n", i); workers[i].id = i; start_rx_worker(&workers[i]); } /* periodically log rx stats */ while (1) { static struct timespec last_info_log = {.tv_sec = 0, .tv_nsec = 0}; struct timespec ts_now; if (OSMO_UNLIKELY(!started)) continue; clock_gettime(CLOCK_MONOTONIC, &ts_now); /* the resolution is in seconds, output stats once per second. */ if (OSMO_UNLIKELY(ts_now.tv_sec != last_info_log.tv_sec)) { struct timespec ts_elapsed, ts_diff; unsigned long long elapsed_usec, diff_usec; uint64_t diff_rx_packets, diff_rx_bytes, diff_tx_packets, diff_tx_bytes; uint64_t elapsed_rx_packets = 0, elapsed_rx_bytes = 0, elapsed_tx_packets = 0, elapsed_tx_bytes = 0; timespecsub(&ts_now, &ts_start, &ts_elapsed); timespecsub(&ts_now, &last_info_log, &ts_diff); elapsed_usec = (ts_elapsed.tv_sec * 1000 * 1000) + (ts_elapsed.tv_nsec / 1000); diff_usec = (ts_diff.tv_sec * 1000 * 1000) + (ts_diff.tv_nsec / 1000); if (elapsed_usec == 0) elapsed_usec = 1; if (diff_usec == 0) diff_usec = 1; last_info_log = ts_now; for (int i = 0; i < cfg.workers_n; i++) { elapsed_rx_packets += workers[i].ctr.rx_packets; elapsed_rx_bytes += workers[i].ctr.rx_bytes; elapsed_tx_packets += workers[i].ctr.tx_packets; elapsed_tx_bytes += workers[i].ctr.tx_bytes; } g_rx.packets.count = elapsed_rx_packets; g_rx.bytes.count = elapsed_rx_bytes; g_tx.packets.count = elapsed_tx_packets; g_tx.bytes.count = elapsed_tx_bytes; diff_rx_packets = counter_get(&g_rx.packets); diff_rx_bytes = counter_get(&g_rx.bytes); diff_tx_packets = counter_get(&g_tx.packets); diff_tx_bytes = counter_get(&g_tx.bytes); if (diff_rx_packets || diff_tx_packets) { printf("DIFF: %12llu Rx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", diff_usec, diff_rx_packets, diff_rx_packets * 1000 / diff_usec, diff_rx_bytes * 8 / diff_usec, diff_tx_packets, diff_tx_packets * 1000 / diff_usec, diff_tx_bytes * 8 / diff_usec); printf("TOTAL: %12llu Rx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Tx: %8"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", elapsed_usec, elapsed_rx_packets, elapsed_rx_packets * 1000 / elapsed_usec, elapsed_rx_bytes * 8 / elapsed_usec, elapsed_tx_packets, elapsed_tx_packets * 1000 / elapsed_usec, elapsed_tx_bytes * 8 / elapsed_usec); fflush(stdout); } } int rc = usleep(500 * 1000); if (rc == -1) fprintf(stderr, "ERROR: usleep() failed errno=%d\n", errno); } } static void *rx_worker_func(void *_worker); static void start_rx_worker(struct worker *w) { w->q = talloc_zero_size(OTC_GLOBAL, sizeof(struct io_queue) + cfg.queue_size * sizeof(struct data_io)); OSMO_ASSERT(w->q); *w->q = (struct io_queue){ .d_size = cfg.queue_size, }; for (int i = 0; i < w->q->d_size; i++) { struct data_io *d = &w->q->d[i]; data_io_prepare(w, d); } int rc = pthread_create(&w->worker, NULL, rx_worker_func, w); OSMO_ASSERT(rc >= 0); } static void *rx_worker_func(void *_worker) { struct worker *w = _worker; struct io_queue *q = w->q; char thread_name[32]; pthread_t phtread_id = pthread_self(); if (cfg.cpu_affinity) { cpu_set_t *cpuset; size_t cpuset_size; int num_cpus = get_num_cpus(); int core_id = w->id % num_cpus; cpuset = CPU_ALLOC(num_cpus); cpuset_size = CPU_ALLOC_SIZE(num_cpus); CPU_ZERO_S(cpuset_size, cpuset); CPU_SET_S(core_id, cpuset_size, cpuset); fprintf(stderr, "Pinning worker %d to CPU %d\n", w->id, core_id); int rc = pthread_setaffinity_np(pthread_self(), cpuset_size, cpuset) != 0; if (rc != 0) { fprintf(stderr, "ERROR: Pinning worker %d to CPU %d: error=%d\n", w->id, core_id, rc); exit(1); } CPU_FREE(cpuset); } snprintf(thread_name, sizeof(thread_name), "UdpRspWrk%u", w->id); if (pthread_setname_np(phtread_id, thread_name) != 0) { char buf[256]; int err = errno; char *err_str = strerror_r(err, buf, sizeof(buf)); fprintf(stderr, "worker %u: failed setting thread name: %s\n", w->id, err_str); } int rc = io_uring_queue_init(q->d_size, &w->ring, 0); OSMO_ASSERT(rc >= 0); unsigned int vals[2] = {1, 1}; rc = io_uring_register_iowq_max_workers(&w->ring, &vals[0]); OSMO_ASSERT(rc == 0); for (int i = 0; i < q->d_size; i++) { struct data_io *d = &q->d[i]; /* fill once with random printable data */ for (int j = 0; j < d->data_size; j++) d->data[j] = 32 + random() % (126 - 32 + 1); } /* fill the queue to start receiving */ for (int i = 0; i < q->d_size; i++) data_io_prep_recv(&w->ring, &cfg.port, &q->d[i]); struct __kernel_timespec ts_zero = {}; struct __kernel_timespec ts_1s = { .tv_sec = 1 }; while (1) { uint32_t submitted; uint32_t completed = 0; struct io_uring_cqe *cqe; /* submit any requests from previous loop */ submitted = io_uring_submit(&w->ring); /* process all pending completions */ while (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_zero) == 0) { data_io_handle_completion(w, &cfg.port, cqe, cfg.response_size, cfg.response_n); completed++; } /* Wait a bit longer */ if (!submitted && !completed) { if (io_uring_wait_cqe_timeout(&w->ring, &cqe, &ts_1s) == 0) { data_io_handle_completion(w, &cfg.port, cqe, cfg.response_size, cfg.response_n); completed++; } } } talloc_free(q); return 0; } #endif /* HAVE_URING */