linux/net/ceph/messenger.c

// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>

#include <linux/crc32c.h>
#include <linux/ctype.h>
#include <linux/highmem.h>
#include <linux/inet.h>
#include <linux/kthread.h>
#include <linux/net.h>
#include <linux/nsproxy.h>
#include <linux/sched/mm.h>
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
#ifdef	CONFIG_BLOCK
#include <linux/bio.h>
#endif	/* CONFIG_BLOCK */
#include <linux/dns_resolver.h>
#include <net/tcp.h>
#include <trace/events/sock.h>

#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/pagelist.h>
#include <linux/export.h>

/*
 * Ceph uses the messenger to exchange ceph_msg messages with other
 * hosts in the system.  The messenger provides ordered and reliable
 * delivery.  We tolerate TCP disconnects by reconnecting (with
 * exponential backoff) in the case of a fault (disconnection, bad
 * crc, protocol error).  Acks allow sent messages to be discarded by
 * the sender.
 */

/*
 * We track the state of the socket on a given connection using
 * values defined below.  The transition to a new socket state is
 * handled by a function which verifies we aren't coming from an
 * unexpected state.
 *
 *      --------
 *      | NEW* |  transient initial state
 *      --------
 *          | con_sock_state_init()
 *          v
 *      ----------
 *      | CLOSED |  initialized, but no socket (and no
 *      ----------  TCP connection)
 *       ^      \
 *       |       \ con_sock_state_connecting()
 *       |        ----------------------
 *       |                              \
 *       + con_sock_state_closed()       \
 *       |+---------------------------    \
 *       | \                          \    \
 *       |  -----------                \    \
 *       |  | CLOSING |  socket event;  \    \
 *       |  -----------  await close     \    \
 *       |       ^                        \   |
 *       |       |                         \  |
 *       |       + con_sock_state_closing() \ |
 *       |      / \                         | |
 *       |     /   ---------------          | |
 *       |    /                   \         v v
 *       |   /                    --------------
 *       |  /    -----------------| CONNECTING |  socket created, TCP
 *       |  |   /                 --------------  connect initiated
 *       |  |   | con_sock_state_connected()
 *       |  |   v
 *      -------------
 *      | CONNECTED |  TCP connection established
 *      -------------
 *
 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
 */

#define CON_SOCK_STATE_NEW
#define CON_SOCK_STATE_CLOSED
#define CON_SOCK_STATE_CONNECTING
#define CON_SOCK_STATE_CONNECTED
#define CON_SOCK_STATE_CLOSING

static bool con_flag_valid(unsigned long con_flag)
{}

void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
{}

void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
{}

bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
{}

bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
				  unsigned long con_flag)
{}

bool ceph_con_flag_test_and_set(struct ceph_connection *con,
				unsigned long con_flag)
{}

/* Slab caches for frequently-allocated structures */

static struct kmem_cache	*ceph_msg_cache;

#ifdef CONFIG_LOCKDEP
static struct lock_class_key socket_class;
#endif

static void queue_con(struct ceph_connection *con);
static void cancel_con(struct ceph_connection *con);
static void ceph_con_workfn(struct work_struct *);
static void con_fault(struct ceph_connection *con);

/*
 * Nicely render a sockaddr as a string.  An array of formatted
 * strings is used, to approximate reentrancy.
 */
#define ADDR_STR_COUNT_LOG
#define ADDR_STR_COUNT
#define ADDR_STR_COUNT_MASK
#define MAX_ADDR_STR_LEN

static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
static atomic_t addr_str_seq =;

struct page *ceph_zero_page;		/* used in certain error cases */

const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
{}
EXPORT_SYMBOL();

void ceph_encode_my_addr(struct ceph_messenger *msgr)
{}

/*
 * work queue for all reading and writing to/from the socket.
 */
static struct workqueue_struct *ceph_msgr_wq;

static int ceph_msgr_slab_init(void)
{}

static void ceph_msgr_slab_exit(void)
{}

static void _ceph_msgr_exit(void)
{}

int __init ceph_msgr_init(void)
{}

void ceph_msgr_exit(void)
{}

void ceph_msgr_flush(void)
{}
EXPORT_SYMBOL();

/* Connection socket state transition functions */

static void con_sock_state_init(struct ceph_connection *con)
{}

static void con_sock_state_connecting(struct ceph_connection *con)
{}

static void con_sock_state_connected(struct ceph_connection *con)
{}

static void con_sock_state_closing(struct ceph_connection *con)
{}

static void con_sock_state_closed(struct ceph_connection *con)
{}

/*
 * socket callback functions
 */

/* data available on socket, or listen socket received a connect */
static void ceph_sock_data_ready(struct sock *sk)
{}

/* socket has buffer space for writing */
static void ceph_sock_write_space(struct sock *sk)
{}

/* socket's state has changed */
static void ceph_sock_state_change(struct sock *sk)
{}

/*
 * set up socket callbacks
 */
static void set_sock_callbacks(struct socket *sock,
			       struct ceph_connection *con)
{}


/*
 * socket helpers
 */

/*
 * initiate connection to a remote socket.
 */
int ceph_tcp_connect(struct ceph_connection *con)
{}

/*
 * Shutdown/close the socket for the given connection.
 */
int ceph_con_close_socket(struct ceph_connection *con)
{}

static void ceph_con_reset_protocol(struct ceph_connection *con)
{}

/*
 * Reset a connection.  Discard all incoming and outgoing messages
 * and clear *_seq state.
 */
static void ceph_msg_remove(struct ceph_msg *msg)
{}

static void ceph_msg_remove_list(struct list_head *head)
{}

void ceph_con_reset_session(struct ceph_connection *con)
{}

/*
 * mark a peer down.  drop any open connections.
 */
void ceph_con_close(struct ceph_connection *con)
{}
EXPORT_SYMBOL();

/*
 * Reopen a closed connection, with a new peer address.
 */
void ceph_con_open(struct ceph_connection *con,
		   __u8 entity_type, __u64 entity_num,
		   struct ceph_entity_addr *addr)
{}
EXPORT_SYMBOL();

/*
 * return true if this connection ever successfully opened
 */
bool ceph_con_opened(struct ceph_connection *con)
{}

/*
 * initialize a new connection.
 */
void ceph_con_init(struct ceph_connection *con, void *private,
	const struct ceph_connection_operations *ops,
	struct ceph_messenger *msgr)
{}
EXPORT_SYMBOL();

/*
 * We maintain a global counter to order connection attempts.  Get
 * a unique seq greater than @gt.
 */
u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
{}

/*
 * Discard messages that have been acked by the server.
 */
void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
{}

/*
 * Discard messages that have been requeued in con_fault(), up to
 * reconnect_seq.  This avoids gratuitously resending messages that
 * the server had received and handled prior to reconnect.
 */
void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
{}

#ifdef CONFIG_BLOCK

/*
 * For a bio data item, a piece is whatever remains of the next
 * entry in the current bio iovec, or the first entry in the next
 * bio in the list.
 */
static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
					size_t length)
{}

static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
						size_t *page_offset,
						size_t *length)
{}

static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
					size_t bytes)
{}
#endif /* CONFIG_BLOCK */

static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
					size_t length)
{}

static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
						size_t *page_offset,
						size_t *length)
{}

static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
					size_t bytes)
{}

/*
 * For a page array, a piece comes from the first page in the array
 * that has not already been fully consumed.
 */
static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
					size_t length)
{}

static struct page *
ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
					size_t *page_offset, size_t *length)
{}

static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
						size_t bytes)
{}

/*
 * For a pagelist, a piece is whatever remains to be consumed in the
 * first page in the list, or the front of the next page.
 */
static void
ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
					size_t length)
{}

static struct page *
ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
				size_t *page_offset, size_t *length)
{}

static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
						size_t bytes)
{}

static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor,
					   size_t length)
{}

static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor,
					    size_t *page_offset, size_t *length)
{}

static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor,
				       size_t bytes)
{}

/*
 * Message data is handled (sent or received) in pieces, where each
 * piece resides on a single page.  The network layer might not
 * consume an entire piece at once.  A data item's cursor keeps
 * track of which piece is next to process and how much remains to
 * be processed in that piece.  It also tracks whether the current
 * piece is the last one in the data item.
 */
static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
{}

void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
			       struct ceph_msg *msg, size_t length)
{}

/*
 * Return the page containing the next piece to process for a given
 * data item, and supply the page offset and length of that piece.
 * Indicate whether this is the last piece in this data item.
 */
struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
				size_t *page_offset, size_t *length)
{}

/*
 * Returns true if the result moves the cursor on to the next piece
 * of the data item.
 */
void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
{}

u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
		     unsigned int length)
{}

bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
{}
EXPORT_SYMBOL();

int ceph_addr_port(const struct ceph_entity_addr *addr)
{}

void ceph_addr_set_port(struct ceph_entity_addr *addr, int p)
{}

/*
 * Unlike other *_pton function semantics, zero indicates success.
 */
static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
		char delim, const char **ipend)
{}

/*
 * Extract hostname string and resolve using kernel DNS facility.
 */
#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
static int ceph_dns_resolve_name(const char *name, size_t namelen,
		struct ceph_entity_addr *addr, char delim, const char **ipend)
{}
#else
static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
		struct ceph_entity_addr *addr, char delim, const char **ipend)
{
	return -EINVAL;
}
#endif

/*
 * Parse a server name (IP or hostname). If a valid IP address is not found
 * then try to extract a hostname to resolve using userspace DNS upcall.
 */
static int ceph_parse_server_name(const char *name, size_t namelen,
		struct ceph_entity_addr *addr, char delim, const char **ipend)
{}

/*
 * Parse an ip[:port] list into an addr array.  Use the default
 * monitor port if a port isn't specified.
 */
int ceph_parse_ips(const char *c, const char *end,
		   struct ceph_entity_addr *addr,
		   int max_count, int *count, char delim)
{}

/*
 * Process message.  This happens in the worker thread.  The callback should
 * be careful not to do anything that waits on other incoming messages or it
 * may deadlock.
 */
void ceph_con_process_message(struct ceph_connection *con)
{}

/*
 * Atomically queue work on a connection after the specified delay.
 * Bump @con reference to avoid races with connection teardown.
 * Returns 0 if work was queued, or an error code otherwise.
 */
static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
{}

static void queue_con(struct ceph_connection *con)
{}

static void cancel_con(struct ceph_connection *con)
{}

static bool con_sock_closed(struct ceph_connection *con)
{}

static bool con_backoff(struct ceph_connection *con)
{}

/* Finish fault handling; con->mutex must *not* be held here */

static void con_fault_finish(struct ceph_connection *con)
{}

/*
 * Do some work on a connection.  Drop a connection ref when we're done.
 */
static void ceph_con_workfn(struct work_struct *work)
{}

/*
 * Generic error/fault handler.  A retry mechanism is used with
 * exponential backoff
 */
static void con_fault(struct ceph_connection *con)
{}

void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
{}

/*
 * initialize a new messenger instance
 */
void ceph_messenger_init(struct ceph_messenger *msgr,
			 struct ceph_entity_addr *myaddr)
{}

void ceph_messenger_fini(struct ceph_messenger *msgr)
{}

static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
{}

static void clear_standby(struct ceph_connection *con)
{}

/*
 * Queue up an outgoing message on the given connection.
 *
 * Consumes a ref on @msg.
 */
void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
{}
EXPORT_SYMBOL();

/*
 * Revoke a message that was previously queued for send
 */
void ceph_msg_revoke(struct ceph_msg *msg)
{}

/*
 * Revoke a message that we may be reading data into
 */
void ceph_msg_revoke_incoming(struct ceph_msg *msg)
{}

/*
 * Queue a keepalive byte to ensure the tcp connection is alive.
 */
void ceph_con_keepalive(struct ceph_connection *con)
{}
EXPORT_SYMBOL();

bool ceph_con_keepalive_expired(struct ceph_connection *con,
			       unsigned long interval)
{}

static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
{}

static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{}

void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
			     size_t length, size_t alignment, bool own_pages)
{}
EXPORT_SYMBOL();

void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
				struct ceph_pagelist *pagelist)
{}
EXPORT_SYMBOL();

#ifdef	CONFIG_BLOCK
void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
			   u32 length)
{}
EXPORT_SYMBOL();
#endif	/* CONFIG_BLOCK */

void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
			     struct ceph_bvec_iter *bvec_pos)
{}
EXPORT_SYMBOL();

void ceph_msg_data_add_iter(struct ceph_msg *msg,
			    struct iov_iter *iter)
{}

/*
 * construct a new message with given type, size
 * the new msg has a ref count of 1.
 */
struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
			       gfp_t flags, bool can_fail)
{}
EXPORT_SYMBOL();

struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
			      bool can_fail)
{}
EXPORT_SYMBOL();

/*
 * Allocate "middle" portion of a message, if it is needed and wasn't
 * allocated by alloc_msg.  This allows us to read a small fixed-size
 * per-type header in the front and then gracefully fail (i.e.,
 * propagate the error to the caller based on info in the front) when
 * the middle is too large.
 */
static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
{}

/*
 * Allocate a message for receiving an incoming message on a
 * connection, and save the result in con->in_msg.  Uses the
 * connection's private alloc_msg op if available.
 *
 * Returns 0 on success, or a negative error code.
 *
 * On success, if we set *skip = 1:
 *  - the next message should be skipped and ignored.
 *  - con->in_msg == NULL
 * or if we set *skip = 0:
 *  - con->in_msg is non-null.
 * On error (ENOMEM, EAGAIN, ...),
 *  - con->in_msg == NULL
 */
int ceph_con_in_msg_alloc(struct ceph_connection *con,
			  struct ceph_msg_header *hdr, int *skip)
{}

void ceph_con_get_out_msg(struct ceph_connection *con)
{}

/*
 * Free a generically kmalloc'd message.
 */
static void ceph_msg_free(struct ceph_msg *m)
{}

static void ceph_msg_release(struct kref *kref)
{}

struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{}
EXPORT_SYMBOL();

void ceph_msg_put(struct ceph_msg *msg)
{}
EXPORT_SYMBOL();

void ceph_msg_dump(struct ceph_msg *msg)
{}
EXPORT_SYMBOL();