// SPDX-License-Identifier: GPL-2.0-only /* * linux/net/sunrpc/xprt.c * * This is a generic RPC call interface supporting congestion avoidance, * and asynchronous calls. * * The interface works like this: * * - When a process places a call, it allocates a request slot if * one is available. Otherwise, it sleeps on the backlog queue * (xprt_reserve). * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_transmit(). * - xprt_transmit sends the message and installs the caller on the * transport's wait list. At the same time, if a reply is expected, * it installs a timer that is run after the packet's timeout has * expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that transport. If a matching XID is found, the * caller is woken up, and the timer removed. * - When no reply arrives within the timeout interval, the timer is * fired by the kernel and runs xprt_timer(). It either adjusts the * timeout values (minor timeout) or wakes up the caller with a status * of -ETIMEDOUT. * - When the caller receives a notification from RPC that a reply arrived, * it should release the RPC slot, and process the reply. * If the call timed out, it may choose to retry the operation by * adjusting the initial timeout value, and simply calling rpc_call * again. * * Support for async RPC is done through a set of RPC-specific scheduling * primitives that `transparently' work for processes as well as async * tasks that rely on callbacks. * * Copyright (C) 1995-1997, Olaf Kirch <[email protected]> * * Transport switch API copyright (C) 2005, Chuck Lever <[email protected]> */ #include <linux/module.h> #include <linux/types.h> #include <linux/interrupt.h> #include <linux/workqueue.h> #include <linux/net.h> #include <linux/ktime.h> #include <linux/sunrpc/clnt.h> #include <linux/sunrpc/metrics.h> #include <linux/sunrpc/bc_xprt.h> #include <linux/rcupdate.h> #include <linux/sched/mm.h> #include <trace/events/sunrpc.h> #include "sunrpc.h" #include "sysfs.h" #include "fail.h" /* * Local variables */ #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) #define RPCDBG_FACILITY … #endif /* * Local functions */ static void xprt_init(struct rpc_xprt *xprt, struct net *net); static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); static void xprt_destroy(struct rpc_xprt *xprt); static void xprt_request_init(struct rpc_task *task); static int xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf); static DEFINE_SPINLOCK(xprt_list_lock); static LIST_HEAD(xprt_list); static unsigned long xprt_request_timeout(const struct rpc_rqst *req) { … } /** * xprt_register_transport - register a transport implementation * @transport: transport to register * * If a transport implementation is loaded as a kernel module, it can * call this interface to make itself known to the RPC client. * * Returns: * 0: transport successfully registered * -EEXIST: transport already registered * -EINVAL: transport module being unloaded */ int xprt_register_transport(struct xprt_class *transport) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_unregister_transport - unregister a transport implementation * @transport: transport to unregister * * Returns: * 0: transport successfully unregistered * -ENOENT: transport never registered */ int xprt_unregister_transport(struct xprt_class *transport) { … } EXPORT_SYMBOL_GPL(…); static void xprt_class_release(const struct xprt_class *t) { … } static const struct xprt_class * xprt_class_find_by_ident_locked(int ident) { … } static const struct xprt_class * xprt_class_find_by_ident(int ident) { … } static const struct xprt_class * xprt_class_find_by_netid_locked(const char *netid) { … } static const struct xprt_class * xprt_class_find_by_netid(const char *netid) { … } /** * xprt_find_transport_ident - convert a netid into a transport identifier * @netid: transport to load * * Returns: * > 0: transport identifier * -ENOENT: transport module not available */ int xprt_find_transport_ident(const char *netid) { … } EXPORT_SYMBOL_GPL(…); static void xprt_clear_locked(struct rpc_xprt *xprt) { … } /** * xprt_reserve_xprt - serialize write access to transports * @task: task that is requesting access to the transport * @xprt: pointer to the target transport * * This prevents mixing the payload of separate requests, and prevents * transport connects from colliding with writes. No congestion control * is provided. */ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); static bool xprt_need_congestion_window_wait(struct rpc_xprt *xprt) { … } static void xprt_set_congestion_window_wait(struct rpc_xprt *xprt) { … } static void xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) { … } /* * xprt_reserve_xprt_cong - serialize write access to transports * @task: task that is requesting access to the transport * * Same as xprt_reserve_xprt, but Van Jacobson congestion control is * integrated into the decision of whether a request is allowed to be * woken up and given access to the transport. * Note that the lock is only granted if we know there are free slots. */ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) { … } static bool __xprt_lock_write_func(struct rpc_task *task, void *data) { … } static void __xprt_lock_write_next(struct rpc_xprt *xprt) { … } static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) { … } /** * xprt_release_xprt - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL. No congestion control is provided. */ void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_release_xprt_cong - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL. Another task is awoken to use the * transport if the transport's congestion window allows it. */ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) { … } /* * Van Jacobson congestion avoidance. Check if the congestion window * overflowed. Put the task to sleep if this is the case. */ static int __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) { … } /* * Adjust the congestion window, and wake up the next task * that has been sleeping due to congestion */ static void __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) { … } /** * xprt_request_get_cong - Request congestion control credits * @xprt: pointer to transport * @req: pointer to RPC request * * Useful for transports that require congestion control. */ bool xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_release_rqst_cong - housekeeping when request is complete * @task: RPC request that recently completed * * Useful for transports that require congestion control. */ void xprt_release_rqst_cong(struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) { … } /* * Clear the congestion window wait flag and wake up the next * entry on xprt->sending */ static void xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) { … } /** * xprt_adjust_cwnd - adjust transport congestion window * @xprt: pointer to xprt * @task: recently completed RPC request used to adjust window * @result: result code of completed RPC request * * The transport code maintains an estimate on the maximum number of out- * standing RPC requests, using a smoothed version of the congestion * avoidance implemented in 44BSD. This is basically the Van Jacobson * congestion algorithm: If a retransmit occurs, the congestion window is * halved; otherwise, it is incremented by 1/cwnd when * * - a reply is received and * - a full number of requests are outstanding and * - the congestion window hasn't been updated recently. */ void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue * @xprt: transport with waiting tasks * @status: result code to plant in each task before waking it * */ void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_wait_for_buffer_space - wait for transport output buffer to clear * @xprt: transport * * Note that we only set the timer for the case of RPC_IS_SOFT(), since * we don't in general want to force a socket disconnection due to * an incomplete RPC call transmission. */ void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); static bool xprt_clear_write_space_locked(struct rpc_xprt *xprt) { … } /** * xprt_write_space - wake the task waiting for transport output buffer space * @xprt: transport with waiting tasks * * Can be called in a soft IRQ context, so xprt_write_space never sleeps. */ bool xprt_write_space(struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) { … } static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req, const struct rpc_timeout *to) { … } static void xprt_reset_majortimeo(struct rpc_rqst *req, const struct rpc_timeout *to) { … } static void xprt_reset_minortimeo(struct rpc_rqst *req) { … } static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req, const struct rpc_timeout *to) { … } /** * xprt_adjust_timeout - adjust timeout values for next retransmit * @req: RPC request containing parameters to use for the adjustment * */ int xprt_adjust_timeout(struct rpc_rqst *req) { … } static void xprt_autoclose(struct work_struct *work) { … } /** * xprt_disconnect_done - mark a transport as disconnected * @xprt: transport to flag for disconnect * */ void xprt_disconnect_done(struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call * @xprt: transport to disconnect */ static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt) { … } /** * xprt_force_disconnect - force a transport to disconnect * @xprt: transport to disconnect * */ void xprt_force_disconnect(struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); static unsigned int xprt_connect_cookie(struct rpc_xprt *xprt) { … } static bool xprt_request_retransmit_after_disconnect(struct rpc_task *task) { … } /** * xprt_conditional_disconnect - force a transport to disconnect * @xprt: transport to disconnect * @cookie: 'connection cookie' * * This attempts to break the connection if and only if 'cookie' matches * the current transport 'connection cookie'. It ensures that we don't * try to break the connection more than once when we need to retransmit * a batch of RPC requests. * */ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) { … } static bool xprt_has_timer(const struct rpc_xprt *xprt) { … } static void xprt_schedule_autodisconnect(struct rpc_xprt *xprt) __must_hold(&xprt->transport_lock) { … } static void xprt_init_autodisconnect(struct timer_list *t) { … } #if IS_ENABLED(CONFIG_FAIL_SUNRPC) static void xprt_inject_disconnect(struct rpc_xprt *xprt) { … } #else static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) { } #endif bool xprt_lock_connect(struct rpc_xprt *xprt, struct rpc_task *task, void *cookie) { … } EXPORT_SYMBOL_GPL(…); void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_connect - schedule a transport connect operation * @task: RPC task that is requesting the connect * */ void xprt_connect(struct rpc_task *task) { … } /** * xprt_reconnect_delay - compute the wait before scheduling a connect * @xprt: transport instance * */ unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_reconnect_backoff - compute the new re-establish timeout * @xprt: transport instance * @init_to: initial reestablish timeout * */ void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) { … } EXPORT_SYMBOL_GPL(…); enum xprt_xid_rb_cmp { … }; static enum xprt_xid_rb_cmp xprt_xid_cmp(__be32 xid1, __be32 xid2) { … } static struct rpc_rqst * xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) { … } static void xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) { … } static void xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) { … } /** * xprt_lookup_rqst - find an RPC request corresponding to an XID * @xprt: transport on which the original request was transmitted * @xid: RPC XID of incoming reply * * Caller holds xprt->queue_lock. */ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) { … } EXPORT_SYMBOL_GPL(…); static bool xprt_is_pinned_rqst(struct rpc_rqst *req) { … } /** * xprt_pin_rqst - Pin a request on the transport receive list * @req: Request to pin * * Caller must ensure this is atomic with the call to xprt_lookup_rqst() * so should be holding xprt->queue_lock. */ void xprt_pin_rqst(struct rpc_rqst *req) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_unpin_rqst - Unpin a request on the transport receive list * @req: Request to pin * * Caller should be holding xprt->queue_lock. */ void xprt_unpin_rqst(struct rpc_rqst *req) { … } EXPORT_SYMBOL_GPL(…); static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) { … } static bool xprt_request_data_received(struct rpc_task *task) { … } static bool xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) { … } /** * xprt_request_enqueue_receive - Add an request to the receive queue * @task: RPC task * */ int xprt_request_enqueue_receive(struct rpc_task *task) { … } /** * xprt_request_dequeue_receive_locked - Remove a request from the receive queue * @task: RPC task * * Caller must hold xprt->queue_lock. */ static void xprt_request_dequeue_receive_locked(struct rpc_task *task) { … } /** * xprt_update_rtt - Update RPC RTT statistics * @task: RPC request that recently completed * * Caller holds xprt->queue_lock. */ void xprt_update_rtt(struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_complete_rqst - called when reply processing is complete * @task: RPC request that recently completed * @copied: actual number of bytes received from the transport * * Caller holds xprt->queue_lock. */ void xprt_complete_rqst(struct rpc_task *task, int copied) { … } EXPORT_SYMBOL_GPL(…); static void xprt_timer(struct rpc_task *task) { … } /** * xprt_wait_for_reply_request_def - wait for reply * @task: pointer to rpc_task * * Set a request's retransmit timeout based on the transport's * default timeout parameters. Used by transports that don't adjust * the retransmit timeout based on round-trip time estimation, * and put the task to sleep on the pending queue. */ void xprt_wait_for_reply_request_def(struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator * @task: pointer to rpc_task * * Set a request's retransmit timeout using the RTT estimator, * and put the task to sleep on the pending queue. */ void xprt_wait_for_reply_request_rtt(struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_request_wait_receive - wait for the reply to an RPC request * @task: RPC task about to send a request * */ void xprt_request_wait_receive(struct rpc_task *task) { … } static bool xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) { … } /** * xprt_request_enqueue_transmit - queue a task for transmission * @task: pointer to rpc_task * * Add a task to the transmission queue. */ void xprt_request_enqueue_transmit(struct rpc_task *task) { … } /** * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue * @task: pointer to rpc_task * * Remove a task from the transmission queue * Caller must hold xprt->queue_lock */ static void xprt_request_dequeue_transmit_locked(struct rpc_task *task) { … } /** * xprt_request_dequeue_transmit - remove a task from the transmission queue * @task: pointer to rpc_task * * Remove a task from the transmission queue */ static void xprt_request_dequeue_transmit(struct rpc_task *task) { … } /** * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue * @task: pointer to rpc_task * * Remove a task from the transmit and receive queues, and ensure that * it is not pinned by the receive work item. */ void xprt_request_dequeue_xprt(struct rpc_task *task) { … } /** * xprt_request_prepare - prepare an encoded request for transport * @req: pointer to rpc_rqst * @buf: pointer to send/rcv xdr_buf * * Calls into the transport layer to do whatever is needed to prepare * the request for transmission or receive. * Returns error, or zero. */ static int xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf) { … } /** * xprt_request_need_retransmit - Test if a task needs retransmission * @task: pointer to rpc_task * * Test for whether a connection breakage requires the task to retransmit */ bool xprt_request_need_retransmit(struct rpc_task *task) { … } /** * xprt_prepare_transmit - reserve the transport before sending a request * @task: RPC task about to send a request * */ bool xprt_prepare_transmit(struct rpc_task *task) { … } void xprt_end_transmit(struct rpc_task *task) { … } /** * xprt_request_transmit - send an RPC request on a transport * @req: pointer to request to transmit * @snd_task: RPC task that owns the transport lock * * This performs the transmission of a single request. * Note that if the request is not the same as snd_task, then it * does need to be pinned. * Returns '0' on success. */ static int xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) { … } /** * xprt_transmit - send an RPC request on a transport * @task: controlling RPC task * * Attempts to drain the transmit queue. On exit, either the transport * signalled an error that needs to be handled before transmission can * resume, or @task finished transmitting, and detected that it already * received a reply. */ void xprt_transmit(struct rpc_task *task) { … } static void xprt_complete_request_init(struct rpc_task *task) { … } void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); static bool __xprt_set_rq(struct rpc_task *task, void *data) { … } bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) { … } EXPORT_SYMBOL_GPL(…); static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) { … } static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) { … } static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) { … } void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) { … } EXPORT_SYMBOL_GPL(…); void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) { … } EXPORT_SYMBOL_GPL(…); static void xprt_free_all_slots(struct rpc_xprt *xprt) { … } static DEFINE_IDA(rpc_xprt_ids); void xprt_cleanup_ids(void) { … } static int xprt_alloc_id(struct rpc_xprt *xprt) { … } static void xprt_free_id(struct rpc_xprt *xprt) { … } struct rpc_xprt *xprt_alloc(struct net *net, size_t size, unsigned int num_prealloc, unsigned int max_alloc) { … } EXPORT_SYMBOL_GPL(…); void xprt_free(struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); static void xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) { … } static __be32 xprt_alloc_xid(struct rpc_xprt *xprt) { … } static void xprt_init_xid(struct rpc_xprt *xprt) { … } static void xprt_request_init(struct rpc_task *task) { … } static void xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) { … } /** * xprt_reserve - allocate an RPC request slot * @task: RPC task requesting a slot allocation * * If the transport is marked as being congested, or if no more * slots are available, place the task on the transport's * backlog queue. */ void xprt_reserve(struct rpc_task *task) { … } /** * xprt_retry_reserve - allocate an RPC request slot * @task: RPC task requesting a slot allocation * * If no more slots are available, place the task on the transport's * backlog queue. * Note that the only difference with xprt_reserve is that we now * ignore the value of the XPRT_CONGESTED flag. */ void xprt_retry_reserve(struct rpc_task *task) { … } /** * xprt_release - release an RPC request slot * @task: task which is finished with the slot * */ void xprt_release(struct rpc_task *task) { … } #ifdef CONFIG_SUNRPC_BACKCHANNEL void xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task, const struct rpc_timeout *to) { … } #endif static void xprt_init(struct rpc_xprt *xprt, struct net *net) { … } /** * xprt_create_transport - create an RPC transport * @args: rpc transport creation arguments * */ struct rpc_xprt *xprt_create_transport(struct xprt_create *args) { … } static void xprt_destroy_cb(struct work_struct *work) { … } /** * xprt_destroy - destroy an RPC transport, killing off all requests. * @xprt: transport to destroy * */ static void xprt_destroy(struct rpc_xprt *xprt) { … } static void xprt_destroy_kref(struct kref *kref) { … } /** * xprt_get - return a reference to an RPC transport. * @xprt: pointer to the transport * */ struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); /** * xprt_put - release a reference to an RPC transport. * @xprt: pointer to the transport * */ void xprt_put(struct rpc_xprt *xprt) { … } EXPORT_SYMBOL_GPL(…); void xprt_set_offline_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps) { … } void xprt_set_online_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps) { … } void xprt_delete_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps) { … }