#include <asm/ioctls.h>
#include <net/sock.h>
#include <net/tcp.h>
#include <linux/pagemap.h>
#include <linux/file.h>
#include <linux/mutex.h>
#include <linux/sctp.h>
#include <linux/slab.h>
#include <net/sctp/sctp.h>
#include <net/ipv6.h>
#include <trace/events/dlm.h>
#include <trace/events/sock.h>
#include "dlm_internal.h"
#include "lowcomms.h"
#include "midcomms.h"
#include "memory.h"
#include "config.h"
#define DLM_SHUTDOWN_WAIT_TIMEOUT …
#define DLM_MAX_PROCESS_BUFFERS …
#define NEEDED_RMEM …
struct connection { … };
#define sock2con(x) …
struct listen_connection { … };
#define DLM_WQ_REMAIN_BYTES(e) …
#define DLM_WQ_LENGTH_BYTES(e) …
struct writequeue_entry { … };
struct dlm_msg { … };
struct processqueue_entry { … };
struct dlm_proto_ops { … };
static struct listen_sock_callbacks { … } listen_sock;
static struct listen_connection listen_con;
static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
static struct workqueue_struct *io_workqueue;
static struct workqueue_struct *process_workqueue;
static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_SPINLOCK(connections_lock);
DEFINE_STATIC_SRCU(…);
static const struct dlm_proto_ops *dlm_proto_ops;
#define DLM_IO_SUCCESS …
#define DLM_IO_END …
#define DLM_IO_EOF …
#define DLM_IO_RESCHED …
#define DLM_IO_FLUSH …
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
static void process_dlm_messages(struct work_struct *work);
static DECLARE_WORK(process_work, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending;
static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
static atomic_t processqueue_count;
static LIST_HEAD(processqueue);
bool dlm_lowcomms_is_running(void)
{ … }
static void lowcomms_queue_swork(struct connection *con)
{ … }
static void lowcomms_queue_rwork(struct connection *con)
{ … }
static void writequeue_entry_ctor(void *data)
{ … }
struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
{ … }
struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
{ … }
static struct writequeue_entry *con_next_wq(struct connection *con)
{ … }
static struct connection *__find_con(int nodeid, int r)
{ … }
static void dlm_con_init(struct connection *con, int nodeid)
{ … }
static struct connection *nodeid2con(int nodeid, gfp_t alloc)
{ … }
static int addr_compare(const struct sockaddr_storage *x,
const struct sockaddr_storage *y)
{ … }
static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
struct sockaddr *sa_out, bool try_new_addr,
unsigned int *mark)
{ … }
static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
unsigned int *mark)
{ … }
static bool dlm_lowcomms_con_has_addr(const struct connection *con,
const struct sockaddr_storage *addr)
{ … }
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr)
{ … }
static void lowcomms_data_ready(struct sock *sk)
{ … }
static void lowcomms_write_space(struct sock *sk)
{ … }
static void lowcomms_state_change(struct sock *sk)
{ … }
static void lowcomms_listen_data_ready(struct sock *sk)
{ … }
int dlm_lowcomms_connect_node(int nodeid)
{ … }
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
{ … }
static void lowcomms_error_report(struct sock *sk)
{ … }
static void restore_callbacks(struct sock *sk)
{ … }
static void add_sock(struct socket *sock, struct connection *con)
{ … }
static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
int *addr_len)
{ … }
static void dlm_page_release(struct kref *kref)
{ … }
static void dlm_msg_release(struct kref *kref)
{ … }
static void free_entry(struct writequeue_entry *e)
{ … }
static void dlm_close_sock(struct socket **sock)
{ … }
static void allow_connection_io(struct connection *con)
{ … }
static void stop_connection_io(struct connection *con)
{ … }
static void close_connection(struct connection *con, bool and_other)
{ … }
static void shutdown_connection(struct connection *con, bool and_other)
{ … }
static struct processqueue_entry *new_processqueue_entry(int nodeid,
int buflen)
{ … }
static void free_processqueue_entry(struct processqueue_entry *pentry)
{ … }
static void process_dlm_messages(struct work_struct *work)
{ … }
static int receive_from_sock(struct connection *con, int buflen)
{ … }
static int accept_from_sock(void)
{ … }
static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
{ … }
static int sctp_bind_addrs(struct socket *sock, uint16_t port)
{ … }
static void init_local(void)
{ … }
static struct writequeue_entry *new_writequeue_entry(struct connection *con)
{ … }
static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
char **ppc, void (*cb)(void *data),
void *data)
{
struct writequeue_entry *e;
spin_lock_bh(&con->writequeue_lock);
if (!list_empty(&con->writequeue)) {
e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
if (DLM_WQ_REMAIN_BYTES(e) >= len) {
kref_get(&e->ref);
*ppc = page_address(e->page) + e->end;
if (cb)
cb(data);
e->end += len;
e->users++;
goto out;
}
}
e = new_writequeue_entry(con);
if (!e)
goto out;
kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
if (cb)
cb(data);
list_add_tail(&e->list, &con->writequeue);
out:
spin_unlock_bh(&con->writequeue_lock);
return e;
};
static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
char **ppc, void (*cb)(void *data),
void *data)
{ … }
#ifndef __CHECKER__
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
void (*cb)(void *data), void *data)
{ … }
#endif
static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
{ … }
#ifndef __CHECKER__
void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
{ … }
#endif
void dlm_lowcomms_put_msg(struct dlm_msg *msg)
{ … }
int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
{ … }
static int send_to_sock(struct connection *con)
{ … }
static void clean_one_writequeue(struct connection *con)
{ … }
static void connection_release(struct rcu_head *rcu)
{ … }
int dlm_lowcomms_close(int nodeid)
{ … }
static void process_recv_sockets(struct work_struct *work)
{ … }
static void process_listen_recv_socket(struct work_struct *work)
{ … }
static int dlm_connect(struct connection *con)
{ … }
static void process_send_sockets(struct work_struct *work)
{ … }
static void work_stop(void)
{ … }
static int work_start(void)
{ … }
void dlm_lowcomms_shutdown(void)
{ … }
void dlm_lowcomms_stop(void)
{ … }
static int dlm_listen_for_all(void)
{ … }
static int dlm_tcp_bind(struct socket *sock)
{ … }
static int dlm_tcp_listen_validate(void)
{ … }
static void dlm_tcp_sockopts(struct socket *sock)
{ … }
static void dlm_tcp_listen_sockopts(struct socket *sock)
{ … }
static int dlm_tcp_listen_bind(struct socket *sock)
{ … }
static const struct dlm_proto_ops dlm_tcp_ops = …;
static int dlm_sctp_bind(struct socket *sock)
{ … }
static int dlm_sctp_listen_validate(void)
{ … }
static int dlm_sctp_bind_listen(struct socket *sock)
{ … }
static void dlm_sctp_sockopts(struct socket *sock)
{ … }
static const struct dlm_proto_ops dlm_sctp_ops = …;
int dlm_lowcomms_start(void)
{ … }
void dlm_lowcomms_init(void)
{ … }
void dlm_lowcomms_exit(void)
{ … }