/* * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the * OpenIB.org BSD license below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. * */ #include <linux/kernel.h> #include <linux/sched/clock.h> #include <linux/slab.h> #include <linux/pci.h> #include <linux/dma-mapping.h> #include <rdma/rdma_cm.h> #include "rds_single_path.h" #include "rds.h" #include "ib.h" static struct kmem_cache *rds_ib_incoming_slab; static struct kmem_cache *rds_ib_frag_slab; static atomic_t rds_ib_allocation = …; void rds_ib_recv_init_ring(struct rds_ib_connection *ic) { … } /* * The entire 'from' list, including the from element itself, is put on * to the tail of the 'to' list. */ static void list_splice_entire_tail(struct list_head *from, struct list_head *to) { … } static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache) { … } static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache, gfp_t gfp) { … } int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic, gfp_t gfp) { … } static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache, struct list_head *caller_list) { … } void rds_ib_recv_free_caches(struct rds_ib_connection *ic) { … } /* fwd decl */ static void rds_ib_recv_cache_put(struct list_head *new_item, struct rds_ib_refill_cache *cache); static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache); /* Recycle frag and attached recv buffer f_sg */ static void rds_ib_frag_free(struct rds_ib_connection *ic, struct rds_page_frag *frag) { … } /* Recycle inc after freeing attached frags */ void rds_ib_inc_free(struct rds_incoming *inc) { … } static void rds_ib_recv_clear_one(struct rds_ib_connection *ic, struct rds_ib_recv_work *recv) { … } void rds_ib_recv_clear_ring(struct rds_ib_connection *ic) { … } static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic, gfp_t slab_mask) { … } static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic, gfp_t slab_mask, gfp_t page_mask) { … } static int rds_ib_recv_refill_one(struct rds_connection *conn, struct rds_ib_recv_work *recv, gfp_t gfp) { … } static int acquire_refill(struct rds_connection *conn) { … } static void release_refill(struct rds_connection *conn) { … } /* * This tries to allocate and post unused work requests after making sure that * they have all the allocations they need to queue received fragments into * sockets. */ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) { … } /* * We want to recycle several types of recv allocations, like incs and frags. * To use this, the *_free() function passes in the ptr to a list_head within * the recyclee, as well as the cache to put it on. * * First, we put the memory on a percpu list. When this reaches a certain size, * We move it to an intermediate non-percpu list in a lockless manner, with some * xchg/compxchg wizardry. * * N.B. Instead of a list_head as the anchor, we use a single pointer, which can * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and * list_empty() will return true with one element is actually present. */ static void rds_ib_recv_cache_put(struct list_head *new_item, struct rds_ib_refill_cache *cache) { … } static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache) { … } int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to) { … } /* ic starts out kzalloc()ed */ void rds_ib_recv_init_ack(struct rds_ib_connection *ic) { … } /* * You'd think that with reliable IB connections you wouldn't need to ack * messages that have been received. The problem is that IB hardware generates * an ack message before it has DMAed the message into memory. This creates a * potential message loss if the HCA is disabled for any reason between when it * sends the ack and before the message is DMAed and processed. This is only a * potential issue if another HCA is available for fail-over. * * When the remote host receives our ack they'll free the sent message from * their send queue. To decrease the latency of this we always send an ack * immediately after we've received messages. * * For simplicity, we only have one ack in flight at a time. This puts * pressure on senders to have deep enough send queues to absorb the latency of * a single ack frame being in flight. This might not be good enough. * * This is implemented by have a long-lived send_wr and sge which point to a * statically allocated ack frame. This ack wr does not fall under the ring * accounting that the tx and rx wrs do. The QP attribute specifically makes * room for it beyond the ring size. Send completion notices its special * wr_id and avoids working with the ring in that case. */ #ifndef KERNEL_HAS_ATOMIC64 void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) { unsigned long flags; spin_lock_irqsave(&ic->i_ack_lock, flags); ic->i_ack_next = seq; if (ack_required) set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); spin_unlock_irqrestore(&ic->i_ack_lock, flags); } static u64 rds_ib_get_ack(struct rds_ib_connection *ic) { unsigned long flags; u64 seq; clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); spin_lock_irqsave(&ic->i_ack_lock, flags); seq = ic->i_ack_next; spin_unlock_irqrestore(&ic->i_ack_lock, flags); return seq; } #else void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) { … } static u64 rds_ib_get_ack(struct rds_ib_connection *ic) { … } #endif static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits) { … } /* * There are 3 ways of getting acknowledgements to the peer: * 1. We call rds_ib_attempt_ack from the recv completion handler * to send an ACK-only frame. * However, there can be only one such frame in the send queue * at any time, so we may have to postpone it. * 2. When another (data) packet is transmitted while there's * an ACK in the queue, we piggyback the ACK sequence number * on the data packet. * 3. If the ACK WR is done sending, we get called from the * send queue completion handler, and check whether there's * another ACK pending (postponed because the WR was on the * queue). If so, we transmit it. * * We maintain 2 variables: * - i_ack_flags, which keeps track of whether the ACK WR * is currently in the send queue or not (IB_ACK_IN_FLIGHT) * - i_ack_next, which is the last sequence number we received * * Potentially, send queue and receive queue handlers can run concurrently. * It would be nice to not have to use a spinlock to synchronize things, * but the one problem that rules this out is that 64bit updates are * not atomic on all platforms. Things would be a lot simpler if * we had atomic64 or maybe cmpxchg64 everywhere. * * Reconnecting complicates this picture just slightly. When we * reconnect, we may be seeing duplicate packets. The peer * is retransmitting them, because it hasn't seen an ACK for * them. It is important that we ACK these. * * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with * this flag set *MUST* be acknowledged immediately. */ /* * When we get here, we're called from the recv queue handler. * Check whether we ought to transmit an ACK. */ void rds_ib_attempt_ack(struct rds_ib_connection *ic) { … } /* * We get here from the send completion handler, when the * adapter tells us the ACK frame was sent. */ void rds_ib_ack_send_complete(struct rds_ib_connection *ic) { … } /* * This is called by the regular xmit code when it wants to piggyback * an ACK on an outgoing frame. */ u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic) { … } /* * It's kind of lame that we're copying from the posted receive pages into * long-lived bitmaps. We could have posted the bitmaps and rdma written into * them. But receiving new congestion bitmaps should be a *rare* event, so * hopefully we won't need to invest that complexity in making it more * efficient. By copying we can share a simpler core with TCP which has to * copy. */ static void rds_ib_cong_recv(struct rds_connection *conn, struct rds_ib_incoming *ibinc) { … } static void rds_ib_process_recv(struct rds_connection *conn, struct rds_ib_recv_work *recv, u32 data_len, struct rds_ib_ack_state *state) { … } void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc, struct rds_ib_ack_state *state) { … } int rds_ib_recv_path(struct rds_conn_path *cp) { … } int rds_ib_recv_init(void) { … } void rds_ib_recv_exit(void) { … }