#include "git-compat-util.h" #include "gettext.h" #include "simple-ipc.h" #include "strbuf.h" #include "thread-utils.h" #include "trace2.h" #include "unix-socket.h" #include "unix-stream-server.h" #ifndef SUPPORTS_SIMPLE_IPC /* * This source file should only be compiled when Simple IPC is supported. * See the top-level Makefile. */ #error SUPPORTS_SIMPLE_IPC not defined #endif enum ipc_active_state ipc_get_active_state(const char *path) { … } /* * Retry frequency when trying to connect to a server. * * This value should be short enough that we don't seriously delay our * caller, but not fast enough that our spinning puts pressure on the * system. */ #define WAIT_STEP_MS … /* * Try to connect to the server. If the server is just starting up or * is very busy, we may not get a connection the first time. */ static enum ipc_active_state connect_to_server( const char *path, int timeout_ms, const struct ipc_client_connect_options *options, int *pfd) { … } /* * The total amount of time that we are willing to wait when trying to * connect to a server. * * When the server is first started, it might take a little while for * it to become ready to service requests. Likewise, the server may * be very (temporarily) busy and not respond to our connections. * * We should gracefully and silently handle those conditions and try * again for a reasonable time period. * * The value chosen here should be long enough for the server * to reliably heal from the above conditions. */ #define MY_CONNECTION_TIMEOUT_MS … enum ipc_active_state ipc_client_try_connect( const char *path, const struct ipc_client_connect_options *options, struct ipc_client_connection **p_connection) { … } void ipc_client_close_connection(struct ipc_client_connection *connection) { … } int ipc_client_send_command_to_connection( struct ipc_client_connection *connection, const char *message, size_t message_len, struct strbuf *answer) { … } int ipc_client_send_command(const char *path, const struct ipc_client_connect_options *options, const char *message, size_t message_len, struct strbuf *answer) { … } static int set_socket_blocking_flag(int fd, int make_nonblocking) { … } /* * Magic numbers used to annotate callback instance data. * These are used to help guard against accidentally passing the * wrong instance data across multiple levels of callbacks (which * is easy to do if there are `void*` arguments). */ enum magic { … }; struct ipc_server_reply_data { … }; struct ipc_worker_thread_data { … }; struct ipc_accept_thread_data { … }; /* * With unix-sockets, the conceptual "ipc-server" is implemented as a single * controller "accept-thread" thread and a pool of "worker-thread" threads. * The former does the usual `accept()` loop and dispatches connections * to an idle worker thread. The worker threads wait in an idle loop for * a new connection, communicate with the client and relay data to/from * the `application_cb` and then wait for another connection from the * server thread. This avoids the overhead of constantly creating and * destroying threads. */ struct ipc_server_data { … }; /* * Remove and return the oldest queued connection. * * Returns -1 if empty. */ static int fifo_dequeue(struct ipc_server_data *server_data) { … } /* * Push a new fd onto the back of the queue. * * Drop it and return -1 if queue is already full. */ static int fifo_enqueue(struct ipc_server_data *server_data, int fd) { … } /* * Wait for a connection to be queued to the FIFO and return it. * * Returns -1 if someone has already requested a shutdown. */ static int worker_thread__wait_for_connection( struct ipc_worker_thread_data *worker_thread_data) { … } /* * Forward declare our reply callback function so that any compiler * errors are reported when we actually define the function (in addition * to any errors reported when we try to pass this callback function as * a parameter in a function call). The former are easier to understand. */ static ipc_server_reply_cb do_io_reply_callback; /* * Relay application's response message to the client process. * (We do not flush at this point because we allow the caller * to chunk data to the client thru us.) */ static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, const char *response, size_t response_len) { … } /* A randomly chosen value. */ #define MY_WAIT_POLL_TIMEOUT_MS … /* * If the client hangs up without sending any data on the wire, just * quietly close the socket and ignore this client. * * This worker thread is committed to reading the IPC request data * from the client at the other end of this fd. Wait here for the * client to actually put something on the wire -- because if the * client just does a ping (connect and hangup without sending any * data), our use of the pkt-line read routines will spew an error * message. * * Return -1 if the client hung up. * Return 0 if data (possibly incomplete) is ready. */ static int worker_thread__wait_for_io_start( struct ipc_worker_thread_data *worker_thread_data, int fd) { … } /* * Receive the request/command from the client and pass it to the * registered request-callback. The request-callback will compose * a response and call our reply-callback to send it to the client. */ static int worker_thread__do_io( struct ipc_worker_thread_data *worker_thread_data, int fd) { … } /* * Block SIGPIPE on the current thread (so that we get EPIPE from * write() rather than an actual signal). * * Note that using sigchain_push() and _pop() to control SIGPIPE * around our IO calls is not thread safe: * [] It uses a global stack of handler frames. * [] It uses ALLOC_GROW() to resize it. * [] Finally, according to the `signal(2)` man-page: * "The effects of `signal()` in a multithreaded process are unspecified." */ static void thread_block_sigpipe(sigset_t *old_set) { … } /* * Thread proc for an IPC worker thread. It handles a series of * connections from clients. It pulls the next fd from the queue * processes it, and then waits for the next client. * * Block SIGPIPE in this worker thread for the life of the thread. * This avoids stray (and sometimes delayed) SIGPIPE signals caused * by client errors and/or when we are under extremely heavy IO load. * * This means that the application callback will have SIGPIPE blocked. * The callback should not change it. */ static void *worker_thread_proc(void *_worker_thread_data) { … } /* A randomly chosen value. */ #define MY_ACCEPT_POLL_TIMEOUT_MS … /* * Accept a new client connection on our socket. This uses non-blocking * IO so that we can also wait for shutdown requests on our socket-pair * without actually spinning on a fast timeout. */ static int accept_thread__wait_for_connection( struct ipc_accept_thread_data *accept_thread_data) { … } /* * Thread proc for the IPC server "accept thread". This waits for * an incoming socket connection, appends it to the queue of available * connections, and notifies a worker thread to process it. * * Block SIGPIPE in this thread for the life of the thread. This * avoids any stray SIGPIPE signals when closing pipe fds under * extremely heavy loads (such as when the fifo queue is full and we * drop incoming connections). */ static void *accept_thread_proc(void *_accept_thread_data) { … } /* * We can't predict the connection arrival rate relative to the worker * processing rate, therefore we allow the "accept-thread" to queue up * a generous number of connections, since we'd rather have the client * not unnecessarily timeout if we can avoid it. (The assumption is * that this will be used for FSMonitor and a few second wait on a * connection is better than having the client timeout and do the full * computation itself.) * * The FIFO queue size is set to a multiple of the worker pool size. * This value chosen at random. */ #define FIFO_SCALE … /* * The backlog value for `listen(2)`. This doesn't need to huge, * rather just large enough for our "accept-thread" to wake up and * queue incoming connections onto the FIFO without the kernel * dropping any. * * This value chosen at random. */ #define LISTEN_BACKLOG … static int create_listener_socket( const char *path, const struct ipc_server_opts *ipc_opts, struct unix_ss_socket **new_server_socket) { … } static int setup_listener_socket( const char *path, const struct ipc_server_opts *ipc_opts, struct unix_ss_socket **new_server_socket) { … } /* * Start IPC server in a pool of background threads. */ int ipc_server_init_async(struct ipc_server_data **returned_server_data, const char *path, const struct ipc_server_opts *opts, ipc_server_application_cb *application_cb, void *application_data) { … } void ipc_server_start_async(struct ipc_server_data *server_data) { … } /* * Gently tell the IPC server treads to shutdown. * Can be run on any thread. */ int ipc_server_stop_async(struct ipc_server_data *server_data) { … } /* * Wait for all IPC server threads to stop. */ int ipc_server_await(struct ipc_server_data *server_data) { … } void ipc_server_free(struct ipc_server_data *server_data) { … }