linux/fs/dlm/lowcomms.c

// SPDX-License-Identifier: GPL-2.0-only
/******************************************************************************
*******************************************************************************
**
**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
**
**
*******************************************************************************
******************************************************************************/

/*
 * lowcomms.c
 *
 * This is the "low-level" comms layer.
 *
 * It is responsible for sending/receiving messages
 * from other nodes in the cluster.
 *
 * Cluster nodes are referred to by their nodeids. nodeids are
 * simply 32 bit numbers to the locking module - if they need to
 * be expanded for the cluster infrastructure then that is its
 * responsibility. It is this layer's
 * responsibility to resolve these into IP address or
 * whatever it needs for inter-node communication.
 *
 * The comms level is two kernel threads that deal mainly with
 * the receiving of messages from other nodes and passing them
 * up to the mid-level comms layer (which understands the
 * message format) for execution by the locking core, and
 * a send thread which does all the setting up of connections
 * to remote nodes and the sending of data. Threads are not allowed
 * to send their own data because it may cause them to wait in times
 * of high load. Also, this way, the sending thread can collect together
 * messages bound for one node and send them in one block.
 *
 * lowcomms will choose to use either TCP or SCTP as its transport layer
 * depending on the configuration variable 'protocol'. This should be set
 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
 * cluster-wide mechanism as it must be the same on all nodes of the cluster
 * for the DLM to function.
 *
 */

#include <asm/ioctls.h>
#include <net/sock.h>
#include <net/tcp.h>
#include <linux/pagemap.h>
#include <linux/file.h>
#include <linux/mutex.h>
#include <linux/sctp.h>
#include <linux/slab.h>
#include <net/sctp/sctp.h>
#include <net/ipv6.h>

#include <trace/events/dlm.h>
#include <trace/events/sock.h>

#include "dlm_internal.h"
#include "lowcomms.h"
#include "midcomms.h"
#include "memory.h"
#include "config.h"

#define DLM_SHUTDOWN_WAIT_TIMEOUT
#define DLM_MAX_PROCESS_BUFFERS
#define NEEDED_RMEM

struct connection {};
#define sock2con(x)

struct listen_connection {};

#define DLM_WQ_REMAIN_BYTES(e)
#define DLM_WQ_LENGTH_BYTES(e)

/* An entry waiting to be sent */
struct writequeue_entry {};

struct dlm_msg {};

struct processqueue_entry {};

struct dlm_proto_ops {};

static struct listen_sock_callbacks {} listen_sock;

static struct listen_connection listen_con;
static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;

/* Work queues */
static struct workqueue_struct *io_workqueue;
static struct workqueue_struct *process_workqueue;

static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_SPINLOCK(connections_lock);
DEFINE_STATIC_SRCU();

static const struct dlm_proto_ops *dlm_proto_ops;

#define DLM_IO_SUCCESS
#define DLM_IO_END
#define DLM_IO_EOF
#define DLM_IO_RESCHED
#define DLM_IO_FLUSH

static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
static void process_dlm_messages(struct work_struct *work);

static DECLARE_WORK(process_work, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending;
static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
static atomic_t processqueue_count;
static LIST_HEAD(processqueue);

bool dlm_lowcomms_is_running(void)
{}

static void lowcomms_queue_swork(struct connection *con)
{}

static void lowcomms_queue_rwork(struct connection *con)
{}

static void writequeue_entry_ctor(void *data)
{}

struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
{}

struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
{}

/* need to held writequeue_lock */
static struct writequeue_entry *con_next_wq(struct connection *con)
{}

static struct connection *__find_con(int nodeid, int r)
{}

static void dlm_con_init(struct connection *con, int nodeid)
{}

/*
 * If 'allocation' is zero then we don't attempt to create a new
 * connection structure for this node.
 */
static struct connection *nodeid2con(int nodeid, gfp_t alloc)
{}

static int addr_compare(const struct sockaddr_storage *x,
			const struct sockaddr_storage *y)
{}

static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
			  struct sockaddr *sa_out, bool try_new_addr,
			  unsigned int *mark)
{}

static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
			  unsigned int *mark)
{}

static bool dlm_lowcomms_con_has_addr(const struct connection *con,
				      const struct sockaddr_storage *addr)
{}

int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr)
{}

/* Data available on socket or listen socket received a connect */
static void lowcomms_data_ready(struct sock *sk)
{}

static void lowcomms_write_space(struct sock *sk)
{}

static void lowcomms_state_change(struct sock *sk)
{}

static void lowcomms_listen_data_ready(struct sock *sk)
{}

int dlm_lowcomms_connect_node(int nodeid)
{}

int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
{}

static void lowcomms_error_report(struct sock *sk)
{}

static void restore_callbacks(struct sock *sk)
{}

/* Make a socket active */
static void add_sock(struct socket *sock, struct connection *con)
{}

/* Add the port number to an IPv6 or 4 sockaddr and return the address
   length */
static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
			  int *addr_len)
{}

static void dlm_page_release(struct kref *kref)
{}

static void dlm_msg_release(struct kref *kref)
{}

static void free_entry(struct writequeue_entry *e)
{}

static void dlm_close_sock(struct socket **sock)
{}

static void allow_connection_io(struct connection *con)
{}

static void stop_connection_io(struct connection *con)
{}

/* Close a remote connection and tidy up */
static void close_connection(struct connection *con, bool and_other)
{}

static void shutdown_connection(struct connection *con, bool and_other)
{}

static struct processqueue_entry *new_processqueue_entry(int nodeid,
							 int buflen)
{}

static void free_processqueue_entry(struct processqueue_entry *pentry)
{}

static void process_dlm_messages(struct work_struct *work)
{}

/* Data received from remote end */
static int receive_from_sock(struct connection *con, int buflen)
{}

/* Listening socket is busy, accept a connection */
static int accept_from_sock(void)
{}

/*
 * writequeue_entry_complete - try to delete and free write queue entry
 * @e: write queue entry to try to delete
 * @completed: bytes completed
 *
 * writequeue_lock must be held.
 */
static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
{}

/*
 * sctp_bind_addrs - bind a SCTP socket to all our addresses
 */
static int sctp_bind_addrs(struct socket *sock, uint16_t port)
{}

/* Get local addresses */
static void init_local(void)
{}

static struct writequeue_entry *new_writequeue_entry(struct connection *con)
{}

static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
					     char **ppc, void (*cb)(void *data),
					     void *data)
{
	struct writequeue_entry *e;

	spin_lock_bh(&con->writequeue_lock);
	if (!list_empty(&con->writequeue)) {
		e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
		if (DLM_WQ_REMAIN_BYTES(e) >= len) {
			kref_get(&e->ref);

			*ppc = page_address(e->page) + e->end;
			if (cb)
				cb(data);

			e->end += len;
			e->users++;
			goto out;
		}
	}

	e = new_writequeue_entry(con);
	if (!e)
		goto out;

	kref_get(&e->ref);
	*ppc = page_address(e->page);
	e->end += len;
	if (cb)
		cb(data);

	list_add_tail(&e->list, &con->writequeue);

out:
	spin_unlock_bh(&con->writequeue_lock);
	return e;
};

static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
						char **ppc, void (*cb)(void *data),
						void *data)
{}

/* avoid false positive for nodes_srcu, unlock happens in
 * dlm_lowcomms_commit_msg which is a must call if success
 */
#ifndef __CHECKER__
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
				     void (*cb)(void *data), void *data)
{}
#endif

static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
{}

/* avoid false positive for nodes_srcu, lock was happen in
 * dlm_lowcomms_new_msg
 */
#ifndef __CHECKER__
void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
{}
#endif

void dlm_lowcomms_put_msg(struct dlm_msg *msg)
{}

/* does not held connections_srcu, usage lowcomms_error_report only */
int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
{}

/* Send a message */
static int send_to_sock(struct connection *con)
{}

static void clean_one_writequeue(struct connection *con)
{}

static void connection_release(struct rcu_head *rcu)
{}

/* Called from recovery when it knows that a node has
   left the cluster */
int dlm_lowcomms_close(int nodeid)
{}

/* Receive worker function */
static void process_recv_sockets(struct work_struct *work)
{}

static void process_listen_recv_socket(struct work_struct *work)
{}

static int dlm_connect(struct connection *con)
{}

/* Send worker function */
static void process_send_sockets(struct work_struct *work)
{}

static void work_stop(void)
{}

static int work_start(void)
{}

void dlm_lowcomms_shutdown(void)
{}

void dlm_lowcomms_stop(void)
{}

static int dlm_listen_for_all(void)
{}

static int dlm_tcp_bind(struct socket *sock)
{}

static int dlm_tcp_connect(struct connection *con, struct socket *sock,
			   struct sockaddr *addr, int addr_len)
{}

static int dlm_tcp_listen_validate(void)
{}

static void dlm_tcp_sockopts(struct socket *sock)
{}

static void dlm_tcp_listen_sockopts(struct socket *sock)
{}

static int dlm_tcp_listen_bind(struct socket *sock)
{}

static const struct dlm_proto_ops dlm_tcp_ops =;

static int dlm_sctp_bind(struct socket *sock)
{}

static int dlm_sctp_connect(struct connection *con, struct socket *sock,
			    struct sockaddr *addr, int addr_len)
{}

static int dlm_sctp_listen_validate(void)
{}

static int dlm_sctp_bind_listen(struct socket *sock)
{}

static void dlm_sctp_sockopts(struct socket *sock)
{}

static const struct dlm_proto_ops dlm_sctp_ops =;

int dlm_lowcomms_start(void)
{}

void dlm_lowcomms_init(void)
{}

void dlm_lowcomms_exit(void)
{}