/* GTP-U traffic/load generator. Generates a configurable amount of UDP/IP flows using io_uring. * * Based on gtp-load-gen.c from https://gitea.osmocom.org/cellular-infrastructure/gtp-load-gen * which is marked (C) 2021 by Harald Welte */ /* * (C) 2024 by sysmocom - s.f.m.c. GmbH * All Rights Reserved. * * Author: Neels Janosch Hofmeyr * * SPDX-License-Identifier: GPL-2.0+ * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include "config.h" #include #include #define HAVE_URING 1 #if HAVE_URING #include #include #include #include #include #include #include #include #include #include #include #include #define PKT_BUF_SIZE 1452 struct gtp_flood_worker; static volatile bool rx_started; static struct timespec ts_start; struct gtp_flood { struct gtp_flood_cfg cfg; /* list of struct gtp_flood_worker */ struct llist_head tx_workers; /* state for round robin */ void *next_tx_worker; int tx_workers_started; int tx_workers_running; /* list of struct gtp_flood_worker */ struct llist_head rx_workers; void *next_rx_worker; int rx_workers_started; int rx_workers_running; }; struct counter { uint64_t count; uint64_t last; }; struct traffic_counter { struct counter packets; struct counter bytes; }; uint64_t counter_get(struct counter *c) { uint64_t val = c->count - c->last; c->last = c->count; return val; } struct gtp_flood_worker { struct llist_head entry; /* backpointer */ struct gtp_flood *gtp_flood; unsigned int id; /* list of struct gtp_flood_flow */ struct llist_head flows; struct io_uring ring; unsigned last_ring_cq_koverflow; unsigned last_ring_sq_kdropped; pthread_t pthread_id; struct counter submitted_packets; struct counter submitted_packets2; struct traffic_counter tx; struct traffic_counter rx; struct { unsigned long long rx_packets; unsigned long long rx_bytes; unsigned long long tx_packets; unsigned long long tx_bytes; } ctr; }; struct gtp_flood_flow { struct llist_head entry; /* backpointer */ struct gtp_flood_worker *worker; struct gtp_flood_flow_cfg cfg; /* for logging and included in generated payloads */ unsigned int id; /* must live until completion */ struct iovec iov[1]; struct msghdr msgh; /* flow-private packet buffer */ uint8_t *pkt_buf; unsigned int submitted_gtp_packets; unsigned int sent_gtp_packets; unsigned int received_udp_packets; unsigned int received_udp_bytes; bool stop; }; static void gtp_flood_worker_init(struct gtp_flood *gtp_flood, struct gtp_flood_worker *worker, unsigned int idx) { worker->id = idx; worker->gtp_flood = gtp_flood; INIT_LLIST_HEAD(&worker->flows); } struct gtp_flood *gtp_flood_alloc(void *ctx, const struct gtp_flood_cfg *cfg) { struct gtp_flood *gtp_flood; struct gtp_flood_cfg *c; int i; gtp_flood = talloc_zero(ctx, struct gtp_flood); *gtp_flood = (struct gtp_flood){ .cfg = *cfg, }; INIT_LLIST_HEAD(>p_flood->tx_workers); INIT_LLIST_HEAD(>p_flood->rx_workers); c = >p_flood->cfg; for (i = 0; i < c->num_tx_workers; i++) { struct gtp_flood_worker *w = talloc_zero(gtp_flood, struct gtp_flood_worker); gtp_flood_worker_init(gtp_flood, w, i); llist_add(&w->entry, >p_flood->tx_workers); } LOGP(DLGLOBAL, LOGL_NOTICE, "tx workers: %u\n", llist_count(>p_flood->tx_workers)); for (i = 0; i < c->num_rx_workers; i++) { struct gtp_flood_worker *w = talloc_zero(gtp_flood, struct gtp_flood_worker); gtp_flood_worker_init(gtp_flood, w, i); llist_add(&w->entry, >p_flood->rx_workers); } LOGP(DLGLOBAL, LOGL_NOTICE, "rx workers: %u\n", llist_count(>p_flood->rx_workers)); return gtp_flood; } static void gtp_flood_flow_init_payload(struct gtp_flood_flow *flow) { struct gtp1u_hdr *gtp_hdr = (void *)flow->pkt_buf; unsigned int udp_payload_len; *gtp_hdr = (struct gtp1u_hdr) { .pn = 0, .s = 0, .e = 0, .spare = 0, .pt = 1, .version = 1, .type = 0xff, /* G-PDU */ .length = 0, /* filled in later */ .tei = htonl(flow->cfg.gtp_remote_teid), }; uint8_t *cur = flow->pkt_buf + sizeof(*gtp_hdr); struct iphdr *iph; struct ip6_hdr *ip6h; struct udphdr *uh; struct osmo_sockaddr *src = &flow->cfg.payload_src; struct osmo_sockaddr *dst = &flow->cfg.payload_dst; if (src->u.sa.sa_family == AF_INET) { udp_payload_len = PKT_BUF_SIZE - (sizeof(struct gtp1u_hdr) + sizeof(struct udphdr) + sizeof(struct iphdr)); iph = (struct iphdr *) cur; cur += sizeof(*iph); iph->ihl = 5; iph->version = 4; iph->tos = 0; iph->tot_len = htons(udp_payload_len + sizeof(struct udphdr) + sizeof(*iph)); iph->id = 0; iph->frag_off = 0; iph->ttl = 32; iph->protocol = IPPROTO_UDP; iph->saddr = src->u.sin.sin_addr.s_addr; iph->daddr = dst->u.sin.sin_addr.s_addr; iph->check = ip_fast_csum(iph, iph->ihl); } else { udp_payload_len = PKT_BUF_SIZE - (sizeof(struct gtp1u_hdr) + sizeof(struct udphdr) + sizeof(struct ip6_hdr)); ip6h = (struct ip6_hdr *) cur; cur += sizeof(*ip6h); ip6h->ip6_flow = htonl((6 << 28)); ip6h->ip6_plen = htons(udp_payload_len + sizeof(struct udphdr) + sizeof(*ip6h)); ip6h->ip6_nxt = IPPROTO_UDP; ip6h->ip6_hlim = 32; ip6h->ip6_src = src->u.sin6.sin6_addr; ip6h->ip6_dst = dst->u.sin6.sin6_addr; } uh = (struct udphdr *) cur; cur += sizeof(*uh); uh->source = htons(osmo_sockaddr_port(&src->u.sa)); uh->dest = htons(osmo_sockaddr_port(&dst->u.sa)); uh->len = htons(udp_payload_len); uh->check = 0; // TODO gtp_hdr->length = htons(udp_payload_len + (cur - flow->pkt_buf) - sizeof(*gtp_hdr)); /* initialize this once, so we have it ready for each transmit */ flow->msgh.msg_name = &flow->cfg.gtp_remote.u.sa; flow->msgh.msg_namelen = sizeof(flow->cfg.gtp_remote.u.sa); flow->msgh.msg_iov = flow->iov; flow->msgh.msg_iovlen = ARRAY_SIZE(flow->iov); flow->msgh.msg_control = NULL; flow->msgh.msg_controllen = 0; flow->msgh.msg_flags = 0; struct iovec *iov = &flow->iov[0]; iov->iov_base = flow->pkt_buf; iov->iov_len = udp_payload_len + (cur - flow->pkt_buf); OSMO_ASSERT(iov->iov_len <= PKT_BUF_SIZE); /* write some payload */ struct osmo_strbuf sb = { .buf = (void *)cur, .len = udp_payload_len }; OSMO_STRBUF_PRINTF(sb, "osmo-upf-load-gen gtp flood, emitted from %s to %s teid 0x%08x flow %u\n", osmo_sockaddr_to_str_c(OTC_SELECT, &flow->cfg.gtp_local->osa), osmo_sockaddr_to_str_c(OTC_SELECT, &flow->cfg.gtp_remote), flow->cfg.gtp_remote_teid, flow->id); if (flow->cfg.append_payload_info) { if (iov->iov_len < (sizeof(flow->cfg.payload_info) + 1)) { OSMO_ASSERT(PKT_BUF_SIZE > sizeof(flow->cfg.payload_info)); iov->iov_len = sizeof(flow->cfg.payload_info) + 1; } uint8_t *len = ((uint8_t *)iov->iov_base) + iov->iov_len - 1; struct gtp_flood_payload_info *info = (void *)(len - sizeof(flow->cfg.payload_info)); *len = sizeof(flow->cfg.payload_info); *info = flow->cfg.payload_info; memcpy(info->mark, "info", 4); } } static void gtp_flood_flow_init_rxbuf(struct gtp_flood_flow *flow) { flow->msgh.msg_iov = flow->iov; flow->msgh.msg_iovlen = ARRAY_SIZE(flow->iov); flow->msgh.msg_control = NULL; flow->msgh.msg_controllen = 0; flow->msgh.msg_flags = 0; flow->iov[0].iov_base = flow->pkt_buf; flow->iov[0].iov_len = PKT_BUF_SIZE; } struct gtp_flood_worker *gtp_flood_next_tx_worker(struct gtp_flood *gtp_flood) { return llist_round_robin(>p_flood->tx_workers, >p_flood->next_tx_worker, struct gtp_flood_worker, entry); } struct gtp_flood_worker *gtp_flood_next_rx_worker(struct gtp_flood *gtp_flood) { return llist_round_robin(>p_flood->rx_workers, >p_flood->next_rx_worker, struct gtp_flood_worker, entry); } void gtp_flood_worker_add_flow(struct gtp_flood *gtp_flood, struct gtp_flood_worker *worker, const struct gtp_flood_flow_cfg *flow_cfg) { static unsigned int next_flow_id = 0; struct gtp_flood_flow *flow = talloc_zero(gtp_flood, struct gtp_flood_flow); flow->cfg = *flow_cfg; flow->id = next_flow_id++; flow->pkt_buf = talloc_zero_size(flow, PKT_BUF_SIZE); OSMO_ASSERT(flow->pkt_buf); flow->worker = worker; if (flow->cfg.rx == false) gtp_flood_flow_init_payload(flow); else gtp_flood_flow_init_rxbuf(flow); llist_add_tail(&flow->entry, &worker->flows); } void gtp_flood_add_flow(struct gtp_flood *gtp_flood, const struct gtp_flood_flow_cfg *flow_cfg) { struct gtp_flood_worker *w; if (flow_cfg->rx == false) w = gtp_flood_next_tx_worker(gtp_flood); else w = gtp_flood_next_rx_worker(gtp_flood); gtp_flood_worker_add_flow(gtp_flood, w, flow_cfg); } /* transmit one packet for a given flow */ static bool gtp_flow_tx_one(struct gtp_flood_flow *flow) { struct gtp_flood_worker *worker = flow->worker; struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(&worker->ring); if (!sqe) return false; io_uring_prep_sendmsg(sqe, flow->cfg.gtp_local->ofd.fd, &flow->msgh, 0); io_uring_sqe_set_data(sqe, flow); return true; } static void tx_completion(struct gtp_flood_worker *worker, struct io_uring_cqe *cqe, int *tx_flows_ended) { struct gtp_flood_flow *flow; flow = io_uring_cqe_get_data(cqe); if (OSMO_LIKELY(cqe->res >= 0)) { flow->sent_gtp_packets++; worker->ctr.tx_packets++; worker->ctr.tx_bytes += cqe->res; if (flow->cfg.num_packets && flow->sent_gtp_packets >= flow->cfg.num_packets) { flow->stop = true; (*tx_flows_ended)++; } } else { flow->submitted_gtp_packets--; } io_uring_cqe_seen(&worker->ring, cqe); } static void apply_thread_name_cpu_affinity(const struct gtp_flood_worker *worker, const char *thread_name) { if (pthread_setname_np(worker->pthread_id, thread_name) != 0) { char buf[256]; int err = errno; char *err_str = strerror_r(err, buf, sizeof(buf)); LOGP(DLGLOBAL, LOGL_ERROR, "rx worker %u: failed setting thread name: %s\n", worker->id, err_str); } OSMO_ASSERT(osmo_cpu_sched_vty_apply_localthread() == 0); } static void *gtp_flood_tx_worker_thread(void *_worker) { struct gtp_flood_worker *worker = (struct gtp_flood_worker *)_worker; struct gtp_flood *gtp_flood = worker->gtp_flood; char thread_name[128]; void *next_flow = NULL; osmo_ctx_init(__func__); gtp_flood->tx_workers_started++; gtp_flood->tx_workers_running++; LOGP(DLGLOBAL, LOGL_INFO, "gtp flood tx worker %d starting (%u started, %u running)\n", worker->id, gtp_flood->tx_workers_started, gtp_flood->tx_workers_running); int tx_flows_count = llist_count(&worker->flows); int tx_flows_ended = 0; int num_submitted_total = 0; int num_submitted2_total = 0; struct __kernel_timespec ts_zero = {}; struct __kernel_timespec ts_timeout = { .tv_nsec = 0.5e9 }; snprintf(thread_name, sizeof(thread_name), "GtpFloodTx%u", worker->id); apply_thread_name_cpu_affinity(worker, thread_name); while (tx_flows_ended < tx_flows_count) { uint32_t num_submitted = 0; int num_submitted2; if (gtp_flood->cfg.slew_us) usleep(gtp_flood->cfg.slew_us); /* fill up sqe with transmit submissions */ bool keep_submitting = true; while (keep_submitting) { int submitted_was = num_submitted; struct gtp_flood_flow *flow; flow = llist_round_robin(&worker->flows, &next_flow, struct gtp_flood_flow, entry); if (flow->stop) continue; if (flow->cfg.num_packets && flow->submitted_gtp_packets >= flow->cfg.num_packets) continue; if (gtp_flow_tx_one(flow)) { flow->submitted_gtp_packets++; num_submitted++; worker->submitted_packets.count++; } else { /* out of sqe. */ keep_submitting = false; break; } /* No change in number of submitted PDUs, all flows are done submitting for this round. */ if (submitted_was == num_submitted) keep_submitting = false; } /* actually submit */ num_submitted2 = io_uring_submit(&worker->ring); worker->submitted_packets2.count += num_submitted2; /* process all pending completions */ int completed = 0; struct io_uring_cqe *cqe; while (io_uring_wait_cqe_timeout(&worker->ring, &cqe, &ts_zero) == 0) { tx_completion(worker, cqe, &tx_flows_ended); completed++; } #define LOG_TX 1 #if LOG_TX /* periodically log tx stats */ static struct timespec tx_last_info_log = {.tv_sec = 0, .tv_nsec = 0}; struct timespec ts_now; clock_gettime(CLOCK_MONOTONIC, &ts_now); /* the resolution is in seconds, output stats once per second. */ if (OSMO_UNLIKELY(ts_now.tv_sec - tx_last_info_log.tv_sec >= 10)) { struct timespec ts_elapsed, ts_diff; unsigned long long elapsed_usec, diff_usec; uint64_t diff_rx_packets, diff_rx_bytes, diff_tx_packets, diff_tx_bytes; uint64_t elapsed_rx_packets = 0; uint64_t elapsed_rx_bytes = 0; uint64_t elapsed_tx_packets = 0; uint64_t elapsed_tx_bytes = 0; timespecsub(&ts_now, &ts_start, &ts_elapsed); timespecsub(&ts_now, &tx_last_info_log, &ts_diff); tx_last_info_log = ts_now; struct gtp_flood_worker *w_it; llist_for_each_entry(w_it, >p_flood->tx_workers, entry) { elapsed_tx_packets += w_it->ctr.tx_packets; elapsed_tx_bytes += w_it->ctr.tx_bytes; } llist_for_each_entry(w_it, >p_flood->rx_workers, entry) { elapsed_rx_packets += w_it->ctr.rx_packets; elapsed_rx_bytes += w_it->ctr.rx_bytes; } worker->tx.packets.count = elapsed_tx_packets; worker->tx.bytes.count = elapsed_tx_bytes; worker->rx.packets.count = elapsed_rx_packets; worker->rx.bytes.count = elapsed_rx_bytes; diff_tx_packets = counter_get(&worker->tx.packets); diff_tx_bytes = counter_get(&worker->tx.bytes); diff_rx_packets = counter_get(&worker->rx.packets); diff_rx_bytes = counter_get(&worker->rx.bytes); if (diff_tx_packets || diff_tx_bytes) { elapsed_usec = (ts_elapsed.tv_sec * 1000 * 1000) + (ts_elapsed.tv_nsec / 1000); diff_usec = (ts_diff.tv_sec * 1000 * 1000) + (ts_diff.tv_nsec / 1000); if (elapsed_usec == 0) elapsed_usec = 1; if (diff_usec == 0) diff_usec = 1; printf("%u: DIFF: %12llu Tx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Rx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", worker->id, diff_usec, diff_tx_packets, diff_tx_packets * 1000 / diff_usec, diff_tx_bytes * 8 / diff_usec, diff_rx_packets, diff_rx_packets * 1000 / diff_usec, diff_rx_bytes * 8 / diff_usec); printf("%u: TOTAL: %12llu Tx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Rx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", worker->id, elapsed_usec, elapsed_tx_packets, elapsed_tx_packets * 1000 / elapsed_usec, elapsed_tx_bytes * 8 / elapsed_usec, elapsed_rx_packets, elapsed_rx_packets * 1000 / elapsed_usec, elapsed_rx_bytes * 8 / elapsed_usec); if (io_uring_cq_has_overflow(&worker->ring)) printf("%u: CQ IN OVERFLOW STATE!\n", worker->id); unsigned val = *worker->ring.cq.koverflow; if (val > worker->last_ring_cq_koverflow) { printf("%u: CQ OVERFLOW! %u -> %u\n", worker->id, worker->last_ring_cq_koverflow, val); worker->last_ring_cq_koverflow = val; } //fflush(stdout); /* mark that something happened to not enter the wait below */ completed++; } } #endif if (!num_submitted2 && !completed) { /* There are currently no slots available for submitting more packets, wait until the next slot * becomes available. After a timeout, re-check whether the worker should exit. */ if (io_uring_wait_cqe_timeout(&worker->ring, &cqe, &ts_timeout) == 0) { tx_completion(worker, cqe, &tx_flows_ended); completed++; } } num_submitted_total += num_submitted; if (num_submitted2 > 0) num_submitted2_total += num_submitted2; } gtp_flood->tx_workers_running--; LOGP(DLGLOBAL, LOGL_INFO, "gtp flood tx worker %d done (%u started, %u running) (%"PRIu64" packets not submitted)\n", worker->id, gtp_flood->tx_workers_started, gtp_flood->tx_workers_running, worker->submitted_packets.count - worker->tx.packets.count); return NULL; } /* receive one packet for a given flow */ static bool gtp_flow_rx_one(struct gtp_flood_flow *flow) { struct gtp_flood_worker *worker = flow->worker; struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(&worker->ring); if (!sqe) return false; io_uring_prep_recvmsg(sqe, flow->cfg.gtp_local->ofd.fd, &flow->msgh, 0); io_uring_sqe_set_data(sqe, flow); return true; } void rx_completion(struct gtp_flood_worker *worker, struct io_uring_cqe *cqe) { int len = cqe->res; struct gtp_flood_flow *flow; flow = io_uring_cqe_get_data(cqe); /* done reading */ if (OSMO_UNLIKELY(!rx_started)) { rx_started = true; OSMO_ASSERT(clock_gettime(CLOCK_MONOTONIC, &ts_start) == 0); } if (OSMO_LIKELY(len > 0)) { flow->received_udp_packets++; flow->received_udp_bytes += len; worker->ctr.rx_packets++; worker->ctr.rx_bytes += len; } else if (OSMO_UNLIKELY(len < 0)) { LOGP(DLGLOBAL, LOGL_ERROR, "%u: rx error rc=%d flags=0x%x\n", worker->id, len, cqe->flags); } io_uring_cqe_seen(&worker->ring, cqe); /* reschedule */ gtp_flow_rx_one(flow); } static void *gtp_flood_rx_worker_thread(void *_worker) { struct gtp_flood_worker *worker = (struct gtp_flood_worker *)_worker; struct gtp_flood *gtp_flood = worker->gtp_flood; char thread_name[128]; osmo_ctx_init(__func__); gtp_flood->rx_workers_started++; gtp_flood->rx_workers_running++; LOGP(DLGLOBAL, LOGL_INFO, "gtp rx worker %u starting (%u started, %u running)\n", worker->id, gtp_flood->rx_workers_started, gtp_flood->rx_workers_running); struct gtp_flood_flow *flow; void *next_flow = NULL; struct __kernel_timespec ts_zero = {}; struct __kernel_timespec ts_1s = { .tv_sec = 1 }; snprintf(thread_name, sizeof(thread_name), "GtpFloodRx%u", worker->id); apply_thread_name_cpu_affinity(worker, thread_name); /* submit all rx flows N times until the queue is full */ do { flow = llist_round_robin(&worker->flows, &next_flow, struct gtp_flood_flow, entry); } while (gtp_flow_rx_one(flow)); /* service completions and resubmit sqe */ while (1) { struct io_uring_cqe *cqe; int submitted; int completed = 0; /* submit batch of pending reads */ submitted = io_uring_submit(&worker->ring); /* process all pending completions */ while (io_uring_wait_cqe_timeout(&worker->ring, &cqe, &ts_zero) == 0) { rx_completion(worker, cqe); completed++; } #define LOG_RX 0 #if LOG_RX /* periodically log rx stats */ static struct timespec last_info_log = {.tv_sec = 0, .tv_nsec = 0}; struct timespec ts_now; clock_gettime(CLOCK_MONOTONIC, &ts_now); /* the resolution is in seconds, output stats once per second. */ if (OSMO_UNLIKELY(ts_now.tv_sec - tx_last_info_log.tv_sec >= 10)) { struct timespec ts_elapsed, ts_diff; unsigned long long elapsed_usec, diff_usec; uint64_t diff_rx_packets, diff_rx_bytes, diff_tx_packets, diff_tx_bytes; uint64_t elapsed_rx_packets = 0; uint64_t elapsed_rx_bytes = 0; uint64_t elapsed_tx_packets = 0; uint64_t elapsed_tx_bytes = 0; timespecsub(&ts_now, &ts_start, &ts_elapsed); timespecsub(&ts_now, &last_info_log, &ts_diff); last_info_log = ts_now; struct gtp_flood_worker *w_it; llist_for_each_entry(w_it, >p_flood->tx_workers, entry) { elapsed_tx_packets += w_it->ctr.tx_packets; elapsed_tx_bytes += w_it->ctr.tx_bytes; } llist_for_each_entry(w_it, >p_flood->rx_workers, entry) { elapsed_rx_packets += w_it->ctr.rx_packets; elapsed_rx_bytes += w_it->ctr.rx_bytes; } worker->tx.packets.count = elapsed_tx_packets; worker->tx.bytes.count = elapsed_tx_bytes; worker->rx.packets.count = elapsed_rx_packets; worker->rx.bytes.count = elapsed_rx_bytes; diff_tx_packets = counter_get(&worker->tx.packets); diff_tx_bytes = counter_get(&worker->tx.bytes); diff_rx_packets = counter_get(&worker->rx.packets); diff_rx_bytes = counter_get(&worker->rx.bytes); if (diff_rx_packets || diff_tx_packets) { elapsed_usec = (ts_elapsed.tv_sec * 1000 * 1000) + (ts_elapsed.tv_nsec / 1000); diff_usec = (ts_diff.tv_sec * 1000 * 1000) + (ts_diff.tv_nsec / 1000); if (elapsed_usec == 0) elapsed_usec = 1; if (diff_usec == 0) diff_usec = 1; printf("%u: DIFF: %12llu Tx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Rx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", worker->id, diff_usec, diff_tx_packets, diff_tx_packets * 1000 / diff_usec, diff_tx_bytes * 8 / diff_usec, diff_rx_packets, diff_rx_packets * 1000 / diff_usec, diff_rx_bytes * 8 / diff_usec); printf("%u: TOTAL: %12llu Tx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps) | Rx: %10"PRIu64" packets (%6llu kPPS, %8llu Mbps)\n", worker->id, elapsed_usec, elapsed_tx_packets, elapsed_tx_packets * 1000 / elapsed_usec, elapsed_tx_bytes * 8 / elapsed_usec, elapsed_rx_packets, elapsed_rx_packets * 1000 / elapsed_usec, elapsed_rx_bytes * 8 / elapsed_usec); if (io_uring_cq_has_overflow(&worker->ring)) printf("%u: CQ IN OVERFLOW STATE!\n", worker->id); unsigned val = *worker->ring.cq.koverflow; if (val > worker->last_ring_cq_koverflow) { printf("%u: CQ OVERFLOW! %u -> %u\n", worker->id, worker->last_ring_cq_koverflow, val); worker->last_ring_cq_koverflow = val; } //fflush(stdout); /* mark that something happened to not enter the wait below */ completed++; } } #endif /* Nothing happened in this loop, wait for the next event */ if (!submitted && !completed) { if (io_uring_wait_cqe_timeout(&worker->ring, &cqe, &ts_1s) == 0) { rx_completion(worker, cqe); completed++; } } } gtp_flood->rx_workers_running--; return NULL; } static void gtp_flood_worker_start(struct gtp_flood_worker *worker, void *(*worker_func)(void *)) { int rc; rc = io_uring_queue_init(worker->gtp_flood->cfg.queue_size, &worker->ring, 0); OSMO_ASSERT(rc >= 0); unsigned int vals[2] = {1, 1}; rc = io_uring_register_iowq_max_workers(&worker->ring, &vals[0]); OSMO_ASSERT(rc == 0); rc = pthread_create(&worker->pthread_id, NULL, worker_func, worker); OSMO_ASSERT(rc >= 0); } void gtp_flood_start(struct gtp_flood *gtp_flood) { struct gtp_flood_worker *w; llist_for_each_entry(w, >p_flood->tx_workers, entry) gtp_flood_worker_start(w, gtp_flood_tx_worker_thread); llist_for_each_entry(w, >p_flood->rx_workers, entry) gtp_flood_worker_start(w, gtp_flood_rx_worker_thread); } bool gtp_flood_is_busy(struct gtp_flood *gtp_flood) { if (!gtp_flood) return false; return gtp_flood->tx_workers_started && gtp_flood->tx_workers_running; } #else /* HAVE_URING */ struct gtp_flood *gtp_flood_alloc(void *ctx, unsigned int workers) { LOGP(DLGLOBAL, LOGL_ERROR, "Cannot start GTP flood: built without liburing support\n"); return NULL; } void gtp_flood_add_flow(struct gtp_flood *gtp_flood, const struct gtp_flood_flow_cfg *flow_cfg) { } void gtp_flood_start(struct gtp_flood *gtp_flood) { } bool gtp_flood_is_busy(struct gtp_flood *gtp_flood) { return false; } #endif /* HAVE_URING */