/* (C) 2011 by Pablo Neira Ayuso * (C) 2015-2016 by Harald Welte * (C) 2023 by sysmocom - s.f.m.c. GmbH * All Rights Reserved. * * SPDX-License-Identifier: GPL-2.0+ * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" #include /*! \file stream_cli.c */ #define LOGSCLI(cli, level, fmt, args...) \ LOGP(DLINP, level, "CLICONN(%s,%s){%s} " fmt, \ cli->name ? : "", \ cli->sockname, \ get_value_string(stream_cli_state_names, (cli)->state), \ ## args) /* * Client side. */ enum osmo_stream_cli_state { STREAM_CLI_STATE_CLOSED, /* No fd associated, no timer active */ STREAM_CLI_STATE_WAIT_RECONNECT, /* No fd associated, has timer active to try to connect again */ STREAM_CLI_STATE_CONNECTING, /* Fd associated, but connection not yet confirmed by peer or lower layers */ STREAM_CLI_STATE_CONNECTED, /* Fd associated and connection is established */ STREAM_CLI_STATE_MAX }; static const struct value_string stream_cli_state_names[] = { { STREAM_CLI_STATE_CLOSED, "CLOSED" }, { STREAM_CLI_STATE_WAIT_RECONNECT, "WAIT_RECONNECT" }, { STREAM_CLI_STATE_CONNECTING, "CONNECTING" }, { STREAM_CLI_STATE_CONNECTED, "CONNECTED" }, { 0, NULL } }; #define OSMO_STREAM_CLI_F_RECONF (1 << 0) #define OSMO_STREAM_CLI_F_NODELAY (1 << 1) /* Mark whether the object is currently in a user callback. */ #define IN_CB_MASK_CONNECT_CB (1 << 0) #define IN_CB_MASK_DISCONNECT_CB (1 << 1) #define IN_CB_MASK_READ_CB (1 << 2) struct osmo_stream_cli { char *name; char sockname[OSMO_SOCK_NAME_MAXLEN]; enum osmo_stream_mode mode; union { struct osmo_fd ofd; struct osmo_io_fd *iofd; }; struct llist_head tx_queue; /* osmo_ofd mode (only): Queue of msgbs */ unsigned int tx_queue_count; /* osmo_ofd mode (only): Current amount of msgbs queued */ unsigned int tx_queue_max_length; /* Max amount of msgbs which can be enqueued */ struct osmo_timer_list timer; enum osmo_stream_cli_state state; char *addr[OSMO_STREAM_MAX_ADDRS]; uint8_t addrcnt; uint16_t port; char *local_addr[OSMO_STREAM_MAX_ADDRS]; uint8_t local_addrcnt; uint16_t local_port; int sk_domain; int sk_type; int sk_prio; /* socket priority, SO_PRIORITY, default=0=unset */ uint16_t proto; uint8_t ip_dscp; /* IP Differentiated services, 0..63, default=0=unset */ osmo_stream_cli_connect_cb_t connect_cb; osmo_stream_cli_disconnect_cb_t disconnect_cb; osmo_stream_cli_read_cb_t read_cb; osmo_stream_cli_read_cb2_t iofd_read_cb; osmo_stream_cli_segmentation_cb_t segmentation_cb; osmo_stream_cli_segmentation_cb2_t segmentation_cb2; void *data; int flags; int reconnect_timeout; struct osmo_sock_init2_multiaddr_pars ma_pars; uint8_t in_cb_mask; /* IN_CB_MASK_* */ bool delay_free; }; /*! \addtogroup stream_cli * @{ */ /* return true if freed */ static inline bool free_delayed_if_needed(struct osmo_stream_cli *cli) { /* Nobody requested delayed free, skip */ if (!cli->delay_free) return false; /* Check for other callbacks active in case we were e.g. in: * read_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy() * or: * connect_cb() -> [user] -> osmo_steam_client_close() -> disconnect_cb() --> [user] --> osmo_stream_client_destroy() */ if (cli->in_cb_mask != 0) return false; LOGSCLI(cli, LOGL_DEBUG, "free(delayed)\n"); talloc_free(cli); return true; } static void stream_cli_close_iofd(struct osmo_stream_cli *cli) { if (!cli->iofd) return; osmo_iofd_free(cli->iofd); cli->iofd = NULL; } static void stream_cli_close_ofd(struct osmo_stream_cli *cli) { if (cli->ofd.fd == -1) return; osmo_fd_unregister(&cli->ofd); close(cli->ofd.fd); cli->ofd.fd = -1; } /*! Close an Osmocom Stream Client. * \param[in] cli Osmocom Stream Client to be closed * \return true if stream was freed due to disconnect_cb, false otherwise * We unregister the socket fd from the osmocom select() loop * abstraction and close the socket */ static bool stream_cli_close(struct osmo_stream_cli *cli) { int old_state = cli->state; LOGSCLI(cli, LOGL_DEBUG, "close()\n"); /* This guards against reentrant close through disconnect_cb(): */ if (cli->state == STREAM_CLI_STATE_CLOSED) return false; if (cli->state == STREAM_CLI_STATE_WAIT_RECONNECT) { osmo_timer_del(&cli->timer); cli->state = STREAM_CLI_STATE_CLOSED; return false; } switch (cli->mode) { case OSMO_STREAM_MODE_OSMO_FD: stream_cli_close_ofd(cli); break; case OSMO_STREAM_MODE_OSMO_IO: stream_cli_close_iofd(cli); break; default: OSMO_ASSERT(false); } cli->state = STREAM_CLI_STATE_CLOSED; /* If conn was established, notify the disconnection to the user: * Also, if reconnect is disabled by user, notify the user that connect() failed: */ if (old_state == STREAM_CLI_STATE_CONNECTED || (old_state == STREAM_CLI_STATE_CONNECTING && cli->reconnect_timeout < 0)) { OSMO_ASSERT(!(cli->in_cb_mask & IN_CB_MASK_DISCONNECT_CB)); cli->in_cb_mask |= IN_CB_MASK_DISCONNECT_CB; if (cli->disconnect_cb) cli->disconnect_cb(cli); cli->in_cb_mask &= ~IN_CB_MASK_DISCONNECT_CB; return free_delayed_if_needed(cli); } return false; } /*! Close an Osmocom Stream Client. * \param[in] cli Osmocom Stream Client to be closed * We unregister the socket fd from the osmocom select() loop * abstraction and close the socket */ void osmo_stream_cli_close(struct osmo_stream_cli *cli) { (void)stream_cli_close(cli); } /*! Re-connect an Osmocom Stream Client. * If re-connection is enabled for this client * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call), * we close any existing connection (if any) and schedule a re-connect timer */ static bool stream_cli_reconnect(struct osmo_stream_cli *cli) { bool freed = stream_cli_close(cli); if (freed) return true; if (cli->reconnect_timeout < 0) { LOGSCLI(cli, LOGL_INFO, "not reconnecting, disabled\n"); return false; } cli->state = STREAM_CLI_STATE_WAIT_RECONNECT; LOGSCLI(cli, LOGL_INFO, "retrying reconnect in %d seconds...\n", cli->reconnect_timeout); osmo_timer_schedule(&cli->timer, cli->reconnect_timeout, 0); return false; } /*! Re-connect an Osmocom Stream Client. * If re-connection is enabled for this client * (which is the case unless negative timeout was explicitly set via osmo_stream_cli_set_reconnect_timeout() call), * we close any existing connection (if any) and schedule a re-connect timer */ void osmo_stream_cli_reconnect(struct osmo_stream_cli *cli) { (void)stream_cli_reconnect(cli); } /*! Check if Osmocom Stream Client is in connected state. * \param[in] cli Osmocom Stream Client * \return true if connected, false otherwise */ bool osmo_stream_cli_is_connected(struct osmo_stream_cli *cli) { return cli->state == STREAM_CLI_STATE_CONNECTED; } /*! Check if Osmocom Stream Client is opened (has an FD available) according to * its current state. * \param[in] cli Osmocom Stream Client * \return true if fd is available (osmo_stream_cli_get_fd()), false otherwise */ static bool stream_cli_is_opened(const struct osmo_stream_cli *cli) { return cli->state == STREAM_CLI_STATE_CONNECTING || cli->state == STREAM_CLI_STATE_CONNECTED; } /*! Retrieve file descriptor of the stream client socket. * \param[in] cli Stream Client of which we want to obtain the file descriptor * \returns File descriptor or negative in case of error */ int osmo_stream_cli_get_fd(const struct osmo_stream_cli *cli) { switch (cli->mode) { case OSMO_STREAM_MODE_OSMO_FD: return cli->ofd.fd; case OSMO_STREAM_MODE_OSMO_IO: if (cli->iofd) return osmo_iofd_get_fd(cli->iofd); default: break; } return -EINVAL; } /*! Retrieve osmo_io descriptor of the stream client socket. * This function must not be called on a stream client in legacy osmo_fd mode! * The iofd is only valid once/after osmo_stream_cli_open() has successfully returned. * \param[in] cli Stream Client of which we want to obtain the file descriptor * \returns osmo_io_fd of stream client, or NULL if stream not yet opened. */ struct osmo_io_fd * osmo_stream_cli_get_iofd(const struct osmo_stream_cli *cli) { OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_IO); return cli->iofd; } /* Return true if read_cb caused a delayed_free, hence cli not available anymore. */ static bool stream_cli_read(struct osmo_stream_cli *cli) { LOGSCLI(cli, LOGL_DEBUG, "message received\n"); cli->in_cb_mask |= IN_CB_MASK_READ_CB; if (cli->read_cb) cli->read_cb(cli); cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; return free_delayed_if_needed(cli); } static int stream_cli_write(struct osmo_stream_cli *cli) { #ifdef HAVE_LIBSCTP struct sctp_sndrcvinfo sinfo; #endif struct msgb *msg; int ret; msg = msgb_dequeue_count(&cli->tx_queue, &cli->tx_queue_count); if (!msg) { /* done, tx_queue empty */ osmo_fd_write_disable(&cli->ofd); return 0; } if (!osmo_stream_cli_is_connected(cli)) { LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n"); return 0; } LOGSCLI(cli, LOGL_DEBUG, "sending %u bytes of data\n", msgb_length(msg)); switch (cli->sk_domain) { case AF_UNIX: ret = send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), 0); break; case AF_UNSPEC: case AF_INET: case AF_INET6: switch (cli->proto) { #ifdef HAVE_LIBSCTP case IPPROTO_SCTP: memset(&sinfo, 0, sizeof(sinfo)); sinfo.sinfo_ppid = htonl(msgb_sctp_ppid(msg)); sinfo.sinfo_stream = msgb_sctp_stream(msg); ret = sctp_send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), &sinfo, MSG_NOSIGNAL); break; #endif case IPPROTO_TCP: default: ret = send(cli->ofd.fd, msgb_data(msg), msgb_length(msg), 0); break; } break; default: ret = -ENOTSUP; } if (ret >= 0 && ret < msgb_length(msg)) { LOGSCLI(cli, LOGL_ERROR, "short send: %d < exp %u\n", ret, msgb_length(msg)); /* Update msgb and re-add it at the start of the queue: */ msgb_pull(msg, ret); llist_add(&msg->list, &cli->tx_queue); cli->tx_queue_count++; return 0; } if (ret < 0) { int err = errno; LOGSCLI(cli, LOGL_ERROR, "send(len=%u) error: %s\n", msgb_length(msg), strerror(err)); if (err == EAGAIN) { /* Re-add at the start of the queue to re-attempt: */ llist_add(&msg->list, &cli->tx_queue); cli->tx_queue_count++; return 0; } msgb_free(msg); (void)stream_cli_reconnect(cli); return 0; } msgb_free(msg); if (llist_empty(&cli->tx_queue)) osmo_fd_write_disable(&cli->ofd); return 0; } static int _setsockopt_nosigpipe(struct osmo_stream_cli *cli) { #ifdef SO_NOSIGPIPE int ret; int val = 1; ret = setsockopt(osmo_stream_cli_get_fd(cli), SOL_SOCKET, SO_NOSIGPIPE, (void *)&val, sizeof(val)); if (ret < 0) LOGSCLI(cli, LOGL_ERROR, "Failed setting SO_NOSIGPIPE: %s\n", strerror(errno)); return ret; #else return 0; #endif } /* returns true if cli is freed */ static bool stream_cli_handle_connecting(struct osmo_stream_cli *cli, int res) { int error, ret = res; socklen_t len = sizeof(error); int fd = osmo_stream_cli_get_fd(cli); OSMO_ASSERT(fd >= 0); if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect failed (%d)\n", res); return stream_cli_reconnect(cli); } ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); if (ret >= 0 && error > 0) { LOGSCLI(cli, LOGL_ERROR, "connect so_error (%d)\n", error); return stream_cli_reconnect(cli); } /* If messages got enqueued while 'connecting', keep WRITE flag up to dispatch them upon next main loop step */ if (cli->mode == OSMO_STREAM_MODE_OSMO_FD && llist_empty(&cli->tx_queue)) osmo_fd_write_disable(&cli->ofd); /* Update sockname based on socket info: */ osmo_sock_get_name_buf(cli->sockname, sizeof(cli->sockname), osmo_stream_cli_get_fd(cli)); LOGSCLI(cli, LOGL_INFO, "connection established\n"); cli->state = STREAM_CLI_STATE_CONNECTED; switch (cli->sk_domain) { case AF_UNIX: _setsockopt_nosigpipe(cli); break; case AF_UNSPEC: case AF_INET: case AF_INET6: if (cli->proto == IPPROTO_SCTP) { _setsockopt_nosigpipe(cli); stream_sctp_sock_activate_events(fd); } break; default: break; } cli->in_cb_mask |= IN_CB_MASK_CONNECT_CB; if (cli->connect_cb) cli->connect_cb(cli); cli->in_cb_mask &= ~IN_CB_MASK_CONNECT_CB; return free_delayed_if_needed(cli); } static int osmo_stream_cli_fd_cb(struct osmo_fd *ofd, unsigned int what) { struct osmo_stream_cli *cli = ofd->data; switch (cli->state) { case STREAM_CLI_STATE_CONNECTING: (void)stream_cli_handle_connecting(cli, 0); break; case STREAM_CLI_STATE_CONNECTED: if (what & OSMO_FD_READ) { LOGSCLI(cli, LOGL_DEBUG, "connected read\n"); if (stream_cli_read(cli) == true) break; /* cli (and hence ofd) freed, done. */ } if (what & OSMO_FD_WRITE) { LOGSCLI(cli, LOGL_DEBUG, "connected write\n"); stream_cli_write(cli); } break; default: /* Only CONNECTING and CONNECTED states are expected, since they are the only states * where FD exists: */ osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); } return 0; } static void cli_timer_cb(void *data); /*! Create an Osmocom stream client. * \param[in] ctx talloc context from which to allocate memory * This function allocates a new \ref osmo_stream_cli and initializes * it with default values (5s reconnect timer, TCP protocol) * \return allocated stream client, or NULL in case of error */ struct osmo_stream_cli *osmo_stream_cli_create(void *ctx) { struct osmo_stream_cli *cli; cli = talloc_zero(ctx, struct osmo_stream_cli); if (!cli) return NULL; cli->mode = OSMO_STREAM_MODE_UNKNOWN; cli->sk_domain = AF_UNSPEC; cli->sk_type = SOCK_STREAM; cli->proto = IPPROTO_TCP; cli->state = STREAM_CLI_STATE_CLOSED; osmo_timer_setup(&cli->timer, cli_timer_cb, cli); cli->reconnect_timeout = 5; /* default is 5 seconds. */ INIT_LLIST_HEAD(&cli->tx_queue); cli->tx_queue_max_length = 1024; /* Default tx queue size, msgbs. */ cli->ma_pars.sctp.version = 0; return cli; } static void stream_cli_iofd_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); bool freed; switch (cli->state) { case STREAM_CLI_STATE_CONNECTING: freed = stream_cli_handle_connecting(cli, res); if (freed) return; /* msg was also freed as part of the talloc tree. */ if (cli->state != STREAM_CLI_STATE_CONNECTED) { msgb_free(msg); return; } /* Follow below common path submitting read_cb(msg) to user. */ break; case STREAM_CLI_STATE_CONNECTED: switch (res) { case -EPIPE: case -ECONNRESET: LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); freed = stream_cli_reconnect(cli); break; case 0: LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); freed = stream_cli_reconnect(cli); break; default: LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", res); freed = false; break; } if (freed) return; /* msg was also freed as part of the talloc tree. */ /* Follow below common path submitting read_cb(msg) to user. */ break; default: osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); } /* Notify user of new data or error: */ if (!cli->iofd_read_cb) { msgb_free(msg); return; } cli->in_cb_mask |= IN_CB_MASK_READ_CB; cli->iofd_read_cb(cli, res, msg); cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; OSMO_ASSERT(cli->in_cb_mask == 0); (void)free_delayed_if_needed(cli); } static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); /* msgb is not owned by us here, no need to free it. */ switch (cli->state) { case STREAM_CLI_STATE_CONNECTING: (void)stream_cli_handle_connecting(cli, res); break; case STREAM_CLI_STATE_CONNECTED: if (msg && res <= 0) { LOGSCLI(cli, LOGL_ERROR, "received error %d in response to send\n", res); (void)stream_cli_reconnect(cli); } /* res=0 && msgb=NULL: "connected notify", but we already received before a read_cb * which moved us to CONNECTED state. Do nothing. */ break; default: osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); } } static const struct osmo_io_ops osmo_stream_cli_ioops = { .read_cb = stream_cli_iofd_read_cb, .write_cb = stream_cli_iofd_write_cb, }; #ifdef HAVE_LIBSCTP static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); bool freed; res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh); switch (cli->state) { case STREAM_CLI_STATE_CONNECTING: freed = stream_cli_handle_connecting(cli, res); if (freed) return; /* msg was also freed as part of the talloc tree. */ if (cli->state != STREAM_CLI_STATE_CONNECTED) { msgb_free(msg); return; } /* Follow below common path submitting read_cb(msg) to user. */ break; case STREAM_CLI_STATE_CONNECTED: switch (res) { case -EPIPE: case -ECONNRESET: LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", res); freed = stream_cli_reconnect(cli); break; case 0: LOGSCLI(cli, LOGL_NOTICE, "connection closed with srv\n"); freed = stream_cli_reconnect(cli); break; default: freed = false; break; } if (freed) return; /* msg was also freed as part of the talloc tree. */ /* Follow below common path submitting read_cb(msg) to user. */ break; default: osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); } /* Notify user of new data or error: */ if (!cli->iofd_read_cb) { msgb_free(msg); return; } cli->in_cb_mask |= IN_CB_MASK_READ_CB; cli->iofd_read_cb(cli, res, msg); cli->in_cb_mask &= ~IN_CB_MASK_READ_CB; OSMO_ASSERT(cli->in_cb_mask == 0); (void)free_delayed_if_needed(cli); } static const struct osmo_io_ops osmo_stream_cli_ioops_sctp = { .recvmsg_cb = stream_cli_iofd_recvmsg_cb, .sendmsg_cb = stream_cli_iofd_write_cb, }; #endif /*! Set a name on the cli object (used during logging). * \param[in] cli stream_cli whose name is to be set * \param[in] name the name to be set on cli */ void osmo_stream_cli_set_name(struct osmo_stream_cli *cli, const char *name) { osmo_talloc_replace_string(cli, &cli->name, name); if (cli->mode == OSMO_STREAM_MODE_OSMO_IO && cli->iofd) osmo_iofd_set_name(cli->iofd, name); } /*! Retrieve name previously set on the cli object (see osmo_stream_cli_set_name()). * \param[in] cli stream_cli whose name is to be retrieved * \returns The name to be set on cli; NULL if never set */ const char *osmo_stream_cli_get_name(const struct osmo_stream_cli *cli) { return cli->name; } /*! Set the remote address to which we connect. * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] addr Remote IP address */ void osmo_stream_cli_set_addr(struct osmo_stream_cli *cli, const char *addr) { osmo_stream_cli_set_addrs(cli, &addr, 1); } /*! Set the remote address set to which we connect. * Useful for protocols allowing connecting to more than one address (such as SCTP) * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] addr Remote IP address set * \return negative on error, 0 on success */ int osmo_stream_cli_set_addrs(struct osmo_stream_cli *cli, const char **addr, size_t addrcnt) { int i = 0; if (addrcnt > OSMO_STREAM_MAX_ADDRS) return -EINVAL; for (; i < addrcnt; i++) osmo_talloc_replace_string(cli, &cli->addr[i], addr[i]); for (; i < cli->addrcnt; i++) { talloc_free(cli->addr[i]); cli->addr[i] = NULL; } cli->addrcnt = addrcnt; cli->flags |= OSMO_STREAM_CLI_F_RECONF; return 0; } /*! Set the remote port number to which we connect. * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] port Remote port number */ void osmo_stream_cli_set_port(struct osmo_stream_cli *cli, uint16_t port) { cli->port = port; cli->flags |= OSMO_STREAM_CLI_F_RECONF; } /*! Set the local port number for the socket (to be bound to). * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] port Local port number */ void osmo_stream_cli_set_local_port(struct osmo_stream_cli *cli, uint16_t port) { cli->local_port = port; cli->flags |= OSMO_STREAM_CLI_F_RECONF; } /*! Set the local address for the socket (to be bound to). * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] port Local host name */ void osmo_stream_cli_set_local_addr(struct osmo_stream_cli *cli, const char *addr) { osmo_stream_cli_set_local_addrs(cli, &addr, 1); } /*! Set the local address set to which we bind. * Useful for protocols allowing bind to more than one address (such as SCTP) * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] addr Local IP address set * \return negative on error, 0 on success */ int osmo_stream_cli_set_local_addrs(struct osmo_stream_cli *cli, const char **addr, size_t addrcnt) { int i = 0; if (addrcnt > OSMO_STREAM_MAX_ADDRS) return -EINVAL; for (; i < addrcnt; i++) osmo_talloc_replace_string(cli, &cli->local_addr[i], addr[i]); for (; i < cli->local_addrcnt; i++) { talloc_free(cli->local_addr[i]); cli->local_addr[i] = NULL; } cli->local_addrcnt = addrcnt; cli->flags |= OSMO_STREAM_CLI_F_RECONF; return 0; } /*! Set the protocol for the stream client socket. * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...) */ void osmo_stream_cli_set_proto(struct osmo_stream_cli *cli, uint16_t proto) { cli->proto = proto; cli->flags |= OSMO_STREAM_CLI_F_RECONF; } /* Callback from iofd, forward to stream_cli user: */ static int stream_cli_iofd_segmentation_cb2(struct osmo_io_fd *iofd, struct msgb *msg) { struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); if (cli->segmentation_cb2) return cli->segmentation_cb2(cli, msg); if (cli->segmentation_cb) return cli->segmentation_cb(msg); OSMO_ASSERT(0); return 0; } /* Configure client side segmentation for the iofd */ static void configure_cli_segmentation_cb(struct osmo_stream_cli *cli) { /* Copy default settings */ struct osmo_io_ops client_ops; osmo_iofd_get_ioops(cli->iofd, &client_ops); /* Set segmentation cb for this client */ if (cli->segmentation_cb || cli->segmentation_cb2) client_ops.segmentation_cb2 = stream_cli_iofd_segmentation_cb2; else client_ops.segmentation_cb2 = NULL; osmo_iofd_set_ioops(cli->iofd, &client_ops); } /*! Set the segmentation callback for the client. * \param[in,out] cli Stream Client to modify * \param[in] segmentation_cb Target segmentation callback * * A segmentation call-back can optionally be used when a packet based protocol * (like TCP) is used within a STREAM style socket that does not preserve * message boundaries within the stream. If a segmentation call-back is given, * the osmo_stream_srv library code will makes sure that the read_cb called * only for complete single messages, and not arbitrary segments of the stream. */ void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli, osmo_stream_cli_segmentation_cb_t segmentation_cb) { cli->segmentation_cb = segmentation_cb; cli->segmentation_cb2 = NULL; if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */ configure_cli_segmentation_cb(cli); } /*! Set the segmentation callback for the client. * \param[in,out] cli Stream Client to modify * \param[in] segmentation_cb2 Target segmentation callback * * Same as osmo_stream_cli_set_segmentation_cb(), but a * osmo_stream_cli_segmentation_cb2_t is called instead which allows access to * the related cli object. */ void osmo_stream_cli_set_segmentation_cb2(struct osmo_stream_cli *cli, osmo_stream_cli_segmentation_cb2_t segmentation_cb2) { cli->segmentation_cb = NULL; cli->segmentation_cb2 = segmentation_cb2; if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */ configure_cli_segmentation_cb(cli); } /*! Set the socket type for the stream server link. * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...) * \returns zero on success, negative -errno on error. */ int osmo_stream_cli_set_type(struct osmo_stream_cli *cli, int type) { switch (type) { case SOCK_STREAM: case SOCK_SEQPACKET: break; default: return -ENOTSUP; } cli->sk_type = type; cli->flags |= OSMO_STREAM_CLI_F_RECONF; return 0; } /*! Set the socket type for the stream server link. * Any changes to this setting will only become active upon next (re)connect. * \param[in] cli Stream Client to modify * \param[in] type Socket Domain (like AF_UNSPEC (default for IP), AF_UNIX, AF_INET, ...) * \returns zero on success, negative -errno on error. */ int osmo_stream_cli_set_domain(struct osmo_stream_cli *cli, int domain) { switch (domain) { case AF_UNSPEC: case AF_INET: case AF_INET6: case AF_UNIX: break; default: return -ENOTSUP; } cli->sk_domain = domain; cli->flags |= OSMO_STREAM_CLI_F_RECONF; return 0; } /*! Set the reconnect time of the stream client socket. * \param[in] cli Stream Client to modify * \param[in] timeout Re-connect timeout in seconds or negative value to disable auto-reconnection */ void osmo_stream_cli_set_reconnect_timeout(struct osmo_stream_cli *cli, int timeout) { cli->reconnect_timeout = timeout; } /*! Set application private data of the stream client socket. * \param[in] cli Stream Client to modify * \param[in] data User-specific data (available in call-back functions) */ void osmo_stream_cli_set_data(struct osmo_stream_cli *cli, void *data) { cli->data = data; } /*! Retrieve application private data of the stream client socket. * \param[in] cli Stream Client to modify * \returns Application private data, as set by \ref osmo_stream_cli_set_data() */ void *osmo_stream_cli_get_data(struct osmo_stream_cli *cli) { return cli->data; } /*! Set the maximum length queue of the stream client. * \param[in] cli Stream Client to modify * \param[in] size maximum amount of msgbs which can be queued in the internal tx queue. * \returns 0 on success, negative on error. * * The maximum length queue default value is 1024 msgbs. */ int osmo_stream_cli_set_tx_queue_max_length(struct osmo_stream_cli *cli, unsigned int size) { cli->tx_queue_max_length = size; if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */ osmo_iofd_set_txqueue_max_length(cli->iofd, cli->tx_queue_max_length); /* XXX: Here, in OSMO_STREAM_MODE_OSMO_FD mode we could check current * size of cli->tx_queue and shrink it from the front or back... */ return 0; } /*! Retrieve the stream client socket description. * Calling this function will build a string that describes the socket in terms of its local/remote * address/port. The returned name is stored in a static buffer; it is hence not re-entrant or thread-safe. * \param[in] cli Stream Client to examine * \returns Socket description or NULL in case of error */ char *osmo_stream_cli_get_sockname(const struct osmo_stream_cli *cli) { static char buf[OSMO_STREAM_MAX_ADDRS * OSMO_SOCK_NAME_MAXLEN]; osmo_sock_multiaddr_get_name_buf(buf, sizeof(buf), osmo_stream_cli_get_fd(cli), cli->proto); return buf; } /*! Retrieve Osmocom File Descriptor of the stream client socket. * This function only works in case you operate osmo_stream_cli in osmo_fd mode! * \param[in] cli Stream Client to modify * \returns Pointer to \ref osmo_fd */ struct osmo_fd * osmo_stream_cli_get_ofd(struct osmo_stream_cli *cli) { OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD); return &cli->ofd; } /*! Set the call-back function called on connect of the stream client socket. * The call-back function registered via this function will be called upon completion of the non-blocking * outbound connect operation. * \param[in] cli Stream Client to modify * \param[in] connect_cb Call-back function to be called upon connect */ void osmo_stream_cli_set_connect_cb(struct osmo_stream_cli *cli, osmo_stream_cli_connect_cb_t connect_cb) { cli->connect_cb = connect_cb; } /*! Set the call-back function called on disconnect of the stream client socket. * \param[in] cli Stream Client to modify * \param[in] disconnect_cb Call-back function to be called upon disconnect */ void osmo_stream_cli_set_disconnect_cb(struct osmo_stream_cli *cli, osmo_stream_cli_disconnect_cb_t disconnect_cb) { cli->disconnect_cb = disconnect_cb; } /*! Set the call-back function called to read from the stream client socket. * This function will implicitly configure osmo_stream_cli to use legacy osmo_ofd mode. * \param[in] cli Stream Client to modify * \param[in] read_cb Call-back function to be called when we want to read */ void osmo_stream_cli_set_read_cb(struct osmo_stream_cli *cli, osmo_stream_cli_read_cb_t read_cb) { OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_IO); cli->mode = OSMO_STREAM_MODE_OSMO_FD; cli->read_cb = read_cb; } /*! Set the call-back function called to read from the stream client socket. * This function will implicitly configure osmo_stream_cli to use osmo_iofd mode. * \param[in] cli Stream Client to modify * \param[in] read_cb Call-back function to be called when data was read from the socket */ void osmo_stream_cli_set_read_cb2(struct osmo_stream_cli *cli, osmo_stream_cli_read_cb2_t read_cb) { OSMO_ASSERT(cli->mode != OSMO_STREAM_MODE_OSMO_FD); cli->mode = OSMO_STREAM_MODE_OSMO_IO; cli->iofd_read_cb = read_cb; } /*! Destroy a Osmocom stream client (includes close). * \param[in] cli Stream Client to destroy */ void osmo_stream_cli_destroy(struct osmo_stream_cli *cli) { if (!cli) return; LOGSCLI(cli, LOGL_DEBUG, "destroy()\n"); OSMO_ASSERT(stream_cli_close(cli) == false); osmo_timer_del(&cli->timer); msgb_queue_free(&cli->tx_queue); cli->tx_queue_count = 0; /* if we are in a user callback, delay freeing. */ if (cli->in_cb_mask != 0) { LOGSCLI(cli, LOGL_DEBUG, "delay free() in_cb_mask=0x%02x\n", cli->in_cb_mask); cli->delay_free = true; /* Move ptr to avoid double free if parent ctx of cli is freed * meanwhile (eg. during user callback after calling * osmo_stream_client_destroy() and before returning from user * callback. */ talloc_steal(OTC_GLOBAL, cli); } else { LOGSCLI(cli, LOGL_DEBUG, "free(destroy)\n"); talloc_free(cli); } } /*! DEPRECATED: use osmo_stream_cli_set_reconnect_timeout() or osmo_stream_cli_reconnect() instead! * Open connection of an Osmocom stream client * \param[in] cli Stream Client to connect * \param[in] reconect 1 if we should not automatically reconnect * \return negative on error, 0 on success */ int osmo_stream_cli_open2(struct osmo_stream_cli *cli, int reconnect) { int ret; /* we are reconfiguring this socket, close existing first. */ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && cli->ofd.fd >= 0) { if (stream_cli_close(cli) == true) return -ENAVAIL; /* freed */ } cli->flags &= ~OSMO_STREAM_CLI_F_RECONF; switch (cli->proto) { #ifdef HAVE_LIBSCTP case IPPROTO_SCTP: ret = osmo_sock_init2_multiaddr2(AF_UNSPEC, SOCK_STREAM, cli->proto, (const char **)cli->local_addr, cli->local_addrcnt, cli->local_port, (const char **)cli->addr, cli->addrcnt, cli->port, OSMO_SOCK_F_CONNECT | OSMO_SOCK_F_BIND | OSMO_SOCK_F_NONBLOCK | OSMO_SOCK_F_DSCP(cli->ip_dscp) | OSMO_SOCK_F_PRIO(cli->sk_prio), &cli->ma_pars); break; #endif default: ret = osmo_sock_init2(AF_UNSPEC, SOCK_STREAM, cli->proto, cli->local_addr[0], cli->local_port, cli->addr[0], cli->port, OSMO_SOCK_F_CONNECT | OSMO_SOCK_F_BIND | OSMO_SOCK_F_NONBLOCK | OSMO_SOCK_F_DSCP(cli->ip_dscp) | OSMO_SOCK_F_PRIO(cli->sk_prio)); } if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret); if (reconnect) (void)stream_cli_reconnect(cli); return ret; } osmo_fd_setup(&cli->ofd, ret, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0); if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) { ret = stream_setsockopt_nodelay(cli->ofd.fd, cli->proto, 1); if (ret < 0) goto error_close_socket; } if (osmo_fd_register(&cli->ofd) < 0) goto error_close_socket; cli->state = STREAM_CLI_STATE_CONNECTING; return 0; error_close_socket: close(cli->ofd.fd); cli->ofd.fd = -1; return -EIO; } /*! Set the NODELAY socket option to avoid Nagle-like behavior. * Setting this to nodelay=true will automatically set the NODELAY * socket option on any socket established via \ref osmo_stream_cli_open * or any re-connect. This can be set either before or after opening the * socket. * \param[in] cli Stream client whose sockets are to be configured * \param[in] nodelay whether to set (true) NODELAY before connect() */ void osmo_stream_cli_set_nodelay(struct osmo_stream_cli *cli, bool nodelay) { int fd; if (nodelay) cli->flags |= OSMO_STREAM_CLI_F_NODELAY; else cli->flags &= ~OSMO_STREAM_CLI_F_NODELAY; if (!stream_cli_is_opened(cli)) return; /* Config will be applied upon open() time */ if ((fd = osmo_stream_cli_get_fd(cli)) < 0) { LOGSCLI(cli, LOGL_ERROR, "set_nodelay(%u): failed obtaining socket\n", nodelay); return; } if (stream_setsockopt_nodelay(fd, cli->proto, nodelay ? 1 : 0) < 0) LOGSCLI(cli, LOGL_ERROR, "set_nodelay(%u): failed setsockopt err=%d\n", nodelay, errno); } /*! Set the priority value of the stream socket. * Setting this will automatically set the socket priority * option on any socket established via \ref osmo_stream_cli_open * or any re-connect. This can be set either before or after opening the * socket. * \param[in] cli Stream client whose sockets are to be configured * \param[in] sk_prio priority value. Values outside 0..6 require CAP_NET_ADMIN. * \return negative on error, 0 on success */ int osmo_stream_cli_set_priority(struct osmo_stream_cli *cli, int sk_prio) { int rc; int fd; if (cli->sk_prio == sk_prio) return 0; /* No change needed */ cli->sk_prio = sk_prio; if (!stream_cli_is_opened(cli)) return 0; /* Config will be applied upon open() time */ if ((fd = osmo_stream_cli_get_fd(cli)) < 0) { /* Shouldn't happen... */ LOGSCLI(cli, LOGL_ERROR, "set_priority(%d): failed obtaining socket\n", cli->sk_prio); return -EBADFD; } if ((rc = osmo_sock_set_priority(fd, cli->sk_prio)) < 0) LOGSCLI(cli, LOGL_ERROR, "set_priority(%d): failed setsockopt err=%d\n", cli->sk_prio, errno); return rc; } /*! Set the DSCP (differentiated services code point) of the stream socket. * Setting this will automatically set the IP DSCP option on any socket established * via \ref osmo_stream_cli_open or any re-connect. This can be set either before or * after opening the socket. * \param[in] cli Stream client whose sockets are to be configured * \param[in] ip_dscp DSCP value. Value range 0..63. * \return negative on error, 0 on success */ int osmo_stream_cli_set_ip_dscp(struct osmo_stream_cli *cli, uint8_t ip_dscp) { int rc; int fd; if (cli->ip_dscp == ip_dscp) return 0; /* No change needed */ cli->ip_dscp = ip_dscp; if (!stream_cli_is_opened(cli)) return 0; /* Config will be applied upon open() time */ if ((fd = osmo_stream_cli_get_fd(cli)) < 0) { /* Shouldn't happen... */ LOGSCLI(cli, LOGL_ERROR, "set_ip_dscp(%u): failed obtaining socket\n", cli->ip_dscp); return -EBADFD; } if ((rc = osmo_sock_set_dscp(fd, cli->ip_dscp)) < 0) LOGSCLI(cli, LOGL_ERROR, "set_ip_dscp(%u): failed setsockopt err=%d\n", cli->ip_dscp, errno); return rc; } /*! Open connection of an Osmocom stream client. * This will initiate an non-blocking outbound connect to the configured destination (server) address. * By default the client will automatically attempt to reconnect after default timeout. * To disable this, use osmo_stream_cli_set_reconnect_timeout() before calling this function. * \param[in] cli Stream Client to connect * \return negative on error, 0 on success */ int osmo_stream_cli_open(struct osmo_stream_cli *cli) { int ret, flags; int fd = -1; unsigned int local_addrcnt; /* we are reconfiguring this socket, close existing first. */ if ((cli->flags & OSMO_STREAM_CLI_F_RECONF) && osmo_stream_cli_get_fd(cli) >= 0) { if (stream_cli_close(cli) == true) return -ENAVAIL; /* freed */ } cli->flags &= ~OSMO_STREAM_CLI_F_RECONF; switch (cli->sk_domain) { case AF_UNIX: ret = osmo_sock_unix_init(cli->sk_type, 0, cli->addr[0], OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_BIND|OSMO_SOCK_F_NONBLOCK); break; case AF_INET: case AF_INET6: case AF_UNSPEC: switch (cli->proto) { #ifdef HAVE_LIBSCTP case IPPROTO_SCTP: local_addrcnt = cli->local_addrcnt; flags = OSMO_SOCK_F_CONNECT | OSMO_SOCK_F_NONBLOCK | OSMO_SOCK_F_DSCP(cli->ip_dscp) | OSMO_SOCK_F_PRIO(cli->sk_prio); if (cli->local_addrcnt > 0 || cli->local_port > 0) { /* explicit bind required? */ flags |= OSMO_SOCK_F_BIND; /* If no local addr configured, use local_addr[0]=NULL by default when creating the socket. */ if (cli->local_addrcnt == 0) local_addrcnt = 1; } ret = osmo_sock_init2_multiaddr2(cli->sk_domain, cli->sk_type, cli->proto, (const char **)cli->local_addr, local_addrcnt, cli->local_port, (const char **)cli->addr, cli->addrcnt, cli->port, flags, &cli->ma_pars); break; #endif default: ret = osmo_sock_init2(cli->sk_domain, cli->sk_type, cli->proto, cli->local_addr[0], cli->local_port, cli->addr[0], cli->port, OSMO_SOCK_F_CONNECT | OSMO_SOCK_F_BIND | OSMO_SOCK_F_NONBLOCK | OSMO_SOCK_F_DSCP(cli->ip_dscp) | OSMO_SOCK_F_PRIO(cli->sk_prio)); } break; default: return -ENOTSUP; } if (ret < 0) { LOGSCLI(cli, LOGL_ERROR, "connect: socket creation error (%d)\n", ret); (void)stream_cli_reconnect(cli); return ret; } fd = ret; if (cli->flags & OSMO_STREAM_CLI_F_NODELAY) { ret = stream_setsockopt_nodelay(fd, cli->proto, 1); if (ret < 0) goto error_close_socket; } switch (cli->mode) { case OSMO_STREAM_MODE_OSMO_FD: osmo_fd_setup(&cli->ofd, fd, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_cli_fd_cb, cli, 0); if (osmo_fd_register(&cli->ofd) < 0) goto error_close_socket; break; case OSMO_STREAM_MODE_OSMO_IO: /* Be sure that previous osmo_io instance is freed before creating a new one. */ stream_cli_close_iofd(cli); #ifdef HAVE_LIBSCTP if (cli->proto == IPPROTO_SCTP) { cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_RECVMSG_SENDMSG, &osmo_stream_cli_ioops_sctp, cli); if (cli->iofd) osmo_iofd_set_cmsg_size(cli->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))); } else { #else if (true) { #endif cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE, &osmo_stream_cli_ioops, cli); } if (!cli->iofd) goto error_close_socket; osmo_iofd_set_txqueue_max_length(cli->iofd, cli->tx_queue_max_length); osmo_iofd_notify_connected(cli->iofd); configure_cli_segmentation_cb(cli); if (osmo_iofd_register(cli->iofd, fd) < 0) goto error_close_socket; break; default: OSMO_ASSERT(false); } cli->state = STREAM_CLI_STATE_CONNECTING; return 0; error_close_socket: cli->state = STREAM_CLI_STATE_CLOSED; close(fd); if (cli->mode == OSMO_STREAM_MODE_OSMO_FD) cli->ofd.fd = -1; return -EIO; } static void cli_timer_cb(void *data) { struct osmo_stream_cli *cli = data; LOGSCLI(cli, LOGL_DEBUG, "reconnecting\n"); osmo_stream_cli_open(cli); } /*! Enqueue data to be sent via an Osmocom stream client.. * This is the function you use for writing/sending/transmitting data via the osmo_stream_cli. * \param[in] cli Stream Client through which we want to send * \param[in] msg Message buffer to enqueue in transmit queue */ void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg) { int rc; OSMO_ASSERT(cli); OSMO_ASSERT(msg); if (!osmo_stream_cli_is_connected(cli)) { LOGSCLI(cli, LOGL_ERROR, "send: not connected, dropping data!\n"); msgb_free(msg); return; } switch (cli->mode) { case OSMO_STREAM_MODE_OSMO_FD: if (cli->tx_queue_count >= cli->tx_queue_max_length) { LOGSCLI(cli, LOGL_ERROR, "send: tx queue full, dropping msg!\n"); msgb_free(msg); return; } msgb_enqueue_count(&cli->tx_queue, msg, &cli->tx_queue_count); osmo_fd_write_enable(&cli->ofd); break; case OSMO_STREAM_MODE_OSMO_IO: /* whenever osmo_stream_cli_is_connected() [see above check], we should have an iofd */ OSMO_ASSERT(cli->iofd); if (cli->proto == IPPROTO_SCTP) rc = stream_iofd_sctp_send_msgb(cli->iofd, msg, MSG_NOSIGNAL); else rc = osmo_iofd_write_msgb(cli->iofd, msg); if (rc < 0) msgb_free(msg); break; default: OSMO_ASSERT(false); } } /*! Receive data via an Osmocom stream client in osmo_fd mode. * \param[in] cli Stream Client through which we want to send * \param msg pre-allocate message buffer to which received data is appended * \returns number of bytes read; <=0 in case of error * * Application programs using the legacy osmo_fd mode of osmo_stream_cli will use * this function to read/receive from a stream client socket after they have been notified that * it is readable (via select/poll). * * If conn is an SCTP connection, additional specific considerations shall be taken: * - msg->cb is always filled with SCTP ppid, and SCTP stream values, see msgb_sctp_*() APIs. * - If an SCTP notification was received when reading from the SCTP socket, * msgb_sctp_msg_flags(msg) will contain bit flag * OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION set, and the msgb will * contain a "union sctp_notification" instead of user data. In this case the * return code will be either 0 (if conn is considered dead after the * notification) or -EAGAIN (if conn is considered still alive after the * notification) resembling the standard recv() API. */ int osmo_stream_cli_recv(struct osmo_stream_cli *cli, struct msgb *msg) { int ret; OSMO_ASSERT(cli); OSMO_ASSERT(msg); OSMO_ASSERT(cli->mode == OSMO_STREAM_MODE_OSMO_FD); switch (cli->sk_domain) { case AF_UNIX: ret = recv(cli->ofd.fd, msg->tail, msgb_tailroom(msg), 0); break; case AF_INET: case AF_INET6: case AF_UNSPEC: switch (cli->proto) { #ifdef HAVE_LIBSCTP case IPPROTO_SCTP: { char log_pfx[128]; snprintf(log_pfx, sizeof(log_pfx), "CLICONN(%s,%s)", cli->name ? : "", cli->sockname); ret = stream_sctp_recvmsg_wrapper(cli->ofd.fd, msg, log_pfx); break; } #endif case IPPROTO_TCP: default: ret = recv(cli->ofd.fd, msg->tail, msgb_tailroom(msg), 0); break; } break; default: ret = -ENOTSUP; } if (ret < 0) { if (ret == -EAGAIN) /* Received MSG_NOTIFICATION from stream_sctp_recvmsg_wrapper() */ return ret; if (errno == EPIPE || errno == ECONNRESET) LOGSCLI(cli, LOGL_ERROR, "lost connection with srv (%d)\n", errno); else LOGSCLI(cli, LOGL_ERROR, "recv failed (%d)\n", errno); (void)stream_cli_reconnect(cli); return ret; } else if (ret == 0) { LOGSCLI(cli, LOGL_ERROR, "connection closed with srv\n"); (void)stream_cli_reconnect(cli); return ret; } msgb_put(msg, ret); LOGSCLI(cli, LOGL_DEBUG, "received %d bytes from srv\n", ret); return ret; } /*! Clear the transmit queue of the stream client. * Calling this function wil clear (delete) any pending, not-yet transmitted data from the transmit queue. */ void osmo_stream_cli_clear_tx_queue(struct osmo_stream_cli *cli) { switch (cli->mode) { case OSMO_STREAM_MODE_OSMO_FD: msgb_queue_free(&cli->tx_queue); cli->tx_queue_count = 0; /* If in state 'connecting', keep WRITE flag up to receive * socket connection signal and then transition to STATE_CONNECTED: */ if (cli->state == STREAM_CLI_STATE_CONNECTED) osmo_fd_write_disable(&cli->ofd); break; case OSMO_STREAM_MODE_OSMO_IO: osmo_iofd_txqueue_clear(cli->iofd); break; default: OSMO_ASSERT(false); } } /*! Set given parameter of stream client to given value. * \param[in] cli stream client on which to set parameter. * \param[in] par identifier of the parameter to be set. * \param[in] val value of the parameter to be set. * \param[in] val_len length of the parameter value. * \returns 0 in success; negative -errno on error. */ int osmo_stream_cli_set_param(struct osmo_stream_cli *cli, enum osmo_stream_cli_param par, void *val, size_t val_len) { OSMO_ASSERT(cli); uint8_t val8; switch (par) { case OSMO_STREAM_CLI_PAR_SCTP_SOCKOPT_AUTH_SUPPORTED: if (!val || val_len != sizeof(uint8_t)) return -EINVAL; val8 = *(uint8_t *)val; cli->ma_pars.sctp.sockopt_auth_supported.set = true; cli->ma_pars.sctp.sockopt_auth_supported.abort_on_failure = val8 > 1; cli->ma_pars.sctp.sockopt_auth_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; break; case OSMO_STREAM_CLI_PAR_SCTP_SOCKOPT_ASCONF_SUPPORTED: if (!val || val_len != sizeof(uint8_t)) return -EINVAL; val8 = *(uint8_t *)val; cli->ma_pars.sctp.sockopt_asconf_supported.set = true; cli->ma_pars.sctp.sockopt_asconf_supported.abort_on_failure = val8 > 1; cli->ma_pars.sctp.sockopt_asconf_supported.value = (val8 == 1 || val8 == 3) ? 1 : 0; break; case OSMO_STREAM_CLI_PAR_SCTP_INIT_NUM_OSTREAMS: if (!val || val_len != sizeof(uint16_t)) return -EINVAL; cli->ma_pars.sctp.sockopt_initmsg.set = true; cli->ma_pars.sctp.sockopt_initmsg.num_ostreams_present = true; cli->ma_pars.sctp.sockopt_initmsg.num_ostreams_value = *(uint16_t *)val; break; case OSMO_STREAM_CLI_PAR_SCTP_INIT_MAX_INSTREAMS: if (!val || val_len != sizeof(uint16_t)) return -EINVAL; cli->ma_pars.sctp.sockopt_initmsg.set = true; cli->ma_pars.sctp.sockopt_initmsg.max_instreams_present = true; cli->ma_pars.sctp.sockopt_initmsg.max_instreams_value = *(uint16_t *)val; break; case OSMO_STREAM_CLI_PAR_SCTP_INIT_MAX_ATTEMPTS: if (!val || val_len != sizeof(uint16_t)) return -EINVAL; cli->ma_pars.sctp.sockopt_initmsg.set = true; cli->ma_pars.sctp.sockopt_initmsg.max_attempts_present = true; cli->ma_pars.sctp.sockopt_initmsg.max_attempts_value = *(uint16_t *)val; break; case OSMO_STREAM_CLI_PAR_SCTP_INIT_TIMEOUT: if (!val || val_len != sizeof(uint16_t)) return -EINVAL; cli->ma_pars.sctp.sockopt_initmsg.set = true; cli->ma_pars.sctp.sockopt_initmsg.max_init_timeo_present = true; cli->ma_pars.sctp.sockopt_initmsg.max_init_timeo_value = *(uint16_t *)val; break; default: return -ENOENT; }; return 0; } /*! @} */