const defaultServerMaxReceiveMessageSize … const defaultServerMaxSendMessageSize … const listenerAddressForServeHTTP … func init() { … } var statusOK … var logger … type methodHandler … type MethodDesc … type ServiceDesc … type serviceInfo … type Server … type serverOptions … var defaultServerOptions … var globalServerOptions … type ServerOption … type EmptyServerOption … func (EmptyServerOption) apply(*serverOptions) { … } type funcServerOption … func (fdo *funcServerOption) apply(do *serverOptions) { … } func newFuncServerOption(f func(*serverOptions)) *funcServerOption { … } type joinServerOption … func (mdo *joinServerOption) apply(do *serverOptions) { … } func newJoinServerOption(opts ...ServerOption) ServerOption { … } // SharedWriteBuffer allows reusing per-connection transport write buffer. // If this option is set to true every connection will release the buffer after // flushing the data on the wire. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func SharedWriteBuffer(val bool) ServerOption { … } // WriteBufferSize determines how much data can be batched before doing a write // on the wire. The default value for this buffer is 32KB. Zero or negative // values will disable the write buffer such that each write will be on underlying // connection. Note: A Send call may not directly translate to a write. func WriteBufferSize(s int) ServerOption { … } // ReadBufferSize lets you set the size of read buffer, this determines how much // data can be read at most for one read syscall. The default value for this // buffer is 32KB. Zero or negative values will disable read buffer for a // connection so data framer can access the underlying conn directly. func ReadBufferSize(s int) ServerOption { … } // InitialWindowSize returns a ServerOption that sets window size for stream. // The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialWindowSize(s int32) ServerOption { … } // InitialConnWindowSize returns a ServerOption that sets window size for a connection. // The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialConnWindowSize(s int32) ServerOption { … } // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { … } // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { … } // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. // // This will override any lookups by content-subtype for Codecs registered with RegisterCodec. // // Deprecated: register codecs using encoding.RegisterCodec. The server will // automatically use registered codecs based on the incoming requests' headers. // See also // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. // Will be supported throughout 1.x. func CustomCodec(codec Codec) ServerOption { … } // ForceServerCodec returns a ServerOption that sets a codec for message // marshaling and unmarshaling. // // This will override any lookups by content-subtype for Codecs registered // with RegisterCodec. // // See Content-Type on // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for // more details. Also see the documentation on RegisterCodec and // CallContentSubtype for more details on the interaction between encoding.Codec // and content-subtype. // // This function is provided for advanced users; prefer to register codecs // using encoding.RegisterCodec. // The server will automatically use registered codecs based on the incoming // requests' headers. See also // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. // Will be supported throughout 1.x. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func ForceServerCodec(codec encoding.Codec) ServerOption { … } // RPCCompressor returns a ServerOption that sets a compressor for outbound // messages. For backward compatibility, all outbound messages will be sent // using this compressor, regardless of incoming message compression. By // default, server messages will be sent using the same compressor with which // request messages were sent. // // Deprecated: use encoding.RegisterCompressor instead. Will be supported // throughout 1.x. func RPCCompressor(cp Compressor) ServerOption { … } // RPCDecompressor returns a ServerOption that sets a decompressor for inbound // messages. It has higher priority than decompressors registered via // encoding.RegisterCompressor. // // Deprecated: use encoding.RegisterCompressor instead. Will be supported // throughout 1.x. func RPCDecompressor(dc Decompressor) ServerOption { … } // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. // If this is not set, gRPC uses the default limit. // // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x. func MaxMsgSize(m int) ServerOption { … } // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. // If this is not set, gRPC uses the default 4MB. func MaxRecvMsgSize(m int) ServerOption { … } // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. // If this is not set, gRPC uses the default `math.MaxInt32`. func MaxSendMsgSize(m int) ServerOption { … } // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number // of concurrent streams to each ServerTransport. func MaxConcurrentStreams(n uint32) ServerOption { … } // Creds returns a ServerOption that sets credentials for server connections. func Creds(c credentials.TransportCredentials) ServerOption { … } // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the // server. Only one unary interceptor can be installed. The construction of multiple // interceptors (e.g., chaining) can be implemented at the caller. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { … } // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor // for unary RPCs. The first interceptor will be the outer most, // while the last interceptor will be the inner most wrapper around the real call. // All unary interceptors added by this method will be chained. func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption { … } // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the // server. Only one stream interceptor can be installed. func StreamInterceptor(i StreamServerInterceptor) ServerOption { … } // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor // for streaming RPCs. The first interceptor will be the outer most, // while the last interceptor will be the inner most wrapper around the real call. // All stream interceptors added by this method will be chained. func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption { … } // InTapHandle returns a ServerOption that sets the tap handle for all the server // transport to be created. Only one can be installed. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func InTapHandle(h tap.ServerInHandle) ServerOption { … } // StatsHandler returns a ServerOption that sets the stats handler for the server. func StatsHandler(h stats.Handler) ServerOption { … } // binaryLogger returns a ServerOption that can set the binary logger for the // server. func binaryLogger(bl binarylog.Logger) ServerOption { … } // UnknownServiceHandler returns a ServerOption that allows for adding a custom // unknown service handler. The provided method is a bidi-streaming RPC service // handler that will be invoked instead of returning the "unimplemented" gRPC // error whenever a request is received for an unregistered service or method. // The handling function and stream interceptor (if set) have full access to // the ServerStream, including its Context. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { … } // ConnectionTimeout returns a ServerOption that sets the timeout for // connection establishment (up to and including HTTP/2 handshaking) for all // new connections. If this is not set, the default is 120 seconds. A zero or // negative value will result in an immediate timeout. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func ConnectionTimeout(d time.Duration) ServerOption { … } type MaxHeaderListSizeServerOption … func (o MaxHeaderListSizeServerOption) apply(so *serverOptions) { … } // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size // of header list that the server is prepared to accept. func MaxHeaderListSize(s uint32) ServerOption { … } // HeaderTableSize returns a ServerOption that sets the size of dynamic // header table for stream. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func HeaderTableSize(s uint32) ServerOption { … } // NumStreamWorkers returns a ServerOption that sets the number of worker // goroutines that should be used to process incoming streams. Setting this to // zero (default) will disable workers and spawn a new goroutine for each // stream. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func NumStreamWorkers(numServerWorkers uint32) ServerOption { … } // WaitForHandlers cause Stop to wait until all outstanding method handlers have // exited before returning. If false, Stop will return as soon as all // connections have closed, but method handlers may still be running. By // default, Stop does not wait for method handlers to return. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func WaitForHandlers(w bool) ServerOption { … } // RecvBufferPool returns a ServerOption that configures the server // to use the provided shared buffer pool for parsing incoming messages. Depending // on the application's workload, this could result in reduced memory allocation. // // If you are unsure about how to implement a memory pool but want to utilize one, // begin with grpc.NewSharedBufferPool. // // Note: The shared buffer pool feature will not be active if any of the following // options are used: StatsHandler, EnableTracing, or binary logging. In such // cases, the shared buffer pool will be ignored. // // Deprecated: use experimental.WithRecvBufferPool instead. Will be deleted in // v1.60.0 or later. func RecvBufferPool(bufferPool SharedBufferPool) ServerOption { … } func recvBufferPool(bufferPool SharedBufferPool) ServerOption { … } const serverWorkerResetThreshold … // serverWorkers blocks on a *transport.Stream channel forever and waits for // data to be fed by serveStreams. This allows multiple requests to be // processed by the same goroutine, removing the need for expensive stack // re-allocations (see the runtime.morestack problem [1]). // // [1] https://github.com/golang/go/issues/18138 func (s *Server) serverWorker() { … } // initServerWorkers creates worker goroutines and a channel to process incoming // connections to reduce the time spent overall on runtime.morestack. func (s *Server) initServerWorkers() { … } // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { … } // printf records an event in s's event log, unless s has been stopped. // REQUIRES s.mu is held. func (s *Server) printf(format string, a ...any) { … } // errorf records an error in s's event log, unless s has been stopped. // REQUIRES s.mu is held. func (s *Server) errorf(format string, a ...any) { … } type ServiceRegistrar … // RegisterService registers a service and its implementation to the gRPC // server. It is called from the IDL generated code. This must be called before // invoking Serve. If ss is non-nil (for legacy code), its type is checked to // ensure it implements sd.HandlerType. func (s *Server) RegisterService(sd *ServiceDesc, ss any) { … } func (s *Server) register(sd *ServiceDesc, ss any) { … } type MethodInfo … type ServiceInfo … // GetServiceInfo returns a map from service names to ServiceInfo. // Service names include the package names, in the form of <package>.<service>. func (s *Server) GetServiceInfo() map[string]ServiceInfo { … } var ErrServerStopped … type listenSocket … func (l *listenSocket) Close() error { … } // Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. The service goroutines // read gRPC requests and then call the registered handlers to reply to them. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when // this method returns. // Serve will return a non-nil error unless Stop or GracefulStop is called. // // Note: All supported releases of Go (as of December 2023) override the OS // defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive // with OS defaults for keepalive time and interval, callers need to do the // following two things: // - pass a net.Listener created by calling the Listen method on a // net.ListenConfig with the `KeepAlive` field set to a negative value. This // will result in the Go standard library not overriding OS defaults for TCP // keepalive interval and time. But this will also result in the Go standard // library not enabling TCP keepalives by default. // - override the Accept method on the passed in net.Listener and set the // SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults. func (s *Server) Serve(lis net.Listener) error { … } // handleRawConn forks a goroutine to handle a just-accepted connection that // has not had any I/O performed on it yet. func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { … } // newHTTP2Transport sets up a http/2 transport (using the // gRPC http2 server transport in transport/http2_server.go). func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { … } func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) { … } var _ … // ServeHTTP implements the Go standard library's http.Handler // interface by responding to the gRPC request r, by looking up // the requested gRPC method in the gRPC server s. // // The provided HTTP request must have arrived on an HTTP/2 // connection. When using the Go standard library's server, // practically this means that the Request must also have arrived // over TLS. // // To share one port (such as 443 for https) between gRPC and an // existing http.Handler, use a root http.Handler such as: // // if r.ProtoMajor == 2 && strings.HasPrefix( // r.Header.Get("Content-Type"), "application/grpc") { // grpcServer.ServeHTTP(w, r) // } else { // yourMux.ServeHTTP(w, r) // } // // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally // separate from grpc-go's HTTP/2 server. Performance and features may vary // between the two paths. ServeHTTP does not support some gRPC features // available through grpc-go's HTTP/2 server. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { … } func (s *Server) addConn(addr string, st transport.ServerTransport) bool { … } func (s *Server) removeConn(addr string, st transport.ServerTransport) { … } func (s *Server) incrCallsStarted() { … } func (s *Server) incrCallsSucceeded() { … } func (s *Server) incrCallsFailed() { … } func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { … } // chainUnaryServerInterceptors chains all unary server interceptors into one. func chainUnaryServerInterceptors(s *Server) { … } func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor { … } func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler { … } func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { … } // chainStreamServerInterceptors chains all stream server interceptors into one. func chainStreamServerInterceptors(s *Server) { … } func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor { … } func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler { … } func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { … } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { … } type streamKey … // NewContextWithServerTransportStream creates a new context from ctx and // attaches stream to it. // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { … } type ServerTransportStream … // ServerTransportStreamFromContext returns the ServerTransportStream saved in // ctx. Returns nil if the given context has no stream associated with it // (which implies it is not an RPC invocation context). // // # Experimental // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { … } // Stop stops the gRPC server. It immediately closes all open // connections and listeners. // It cancels all active RPCs on the server side and the corresponding // pending RPCs on the client side will get notified by connection // errors. func (s *Server) Stop() { … } // GracefulStop stops the gRPC server gracefully. It stops the server from // accepting new connections and RPCs and blocks until all the pending RPCs are // finished. func (s *Server) GracefulStop() { … } func (s *Server) stop(graceful bool) { … } // s.mu must be held by the caller. func (s *Server) closeServerTransportsLocked() { … } // s.mu must be held by the caller. func (s *Server) drainAllServerTransportsLocked() { … } // s.mu must be held by the caller. func (s *Server) closeListenersLocked() { … } // contentSubtype must be lowercase // cannot return nil func (s *Server) getCodec(contentSubtype string) baseCodec { … } type serverKey … // serverFromContext gets the Server from the context. func serverFromContext(ctx context.Context) *Server { … } // contextWithServer sets the Server in the context. func contextWithServer(ctx context.Context, server *Server) context.Context { … } // isRegisteredMethod returns whether the passed in method is registered as a // method on the server. /service/method and service/method will match if the // service and method are registered on the server. func (s *Server) isRegisteredMethod(serviceMethod string) bool { … } // SetHeader sets the header metadata to be sent from the server to the client. // The context provided must be the context passed to the server's handler. // // Streaming RPCs should prefer the SetHeader method of the ServerStream. // // When called multiple times, all the provided metadata will be merged. All // the metadata will be sent out when one of the following happens: // // - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader. // - The first response message is sent. For unary handlers, this occurs when // the handler returns; for streaming handlers, this can happen when stream's // SendMsg method is called. // - An RPC status is sent out (error or success). This occurs when the handler // returns. // // SetHeader will fail if called after any of the events above. // // The error returned is compatible with the status package. However, the // status code will often not match the RPC status as seen by the client // application, and therefore, should not be relied upon for this purpose. func SetHeader(ctx context.Context, md metadata.MD) error { … } // SendHeader sends header metadata. It may be called at most once, and may not // be called after any event that causes headers to be sent (see SetHeader for // a complete list). The provided md and headers set by SetHeader() will be // sent. // // The error returned is compatible with the status package. However, the // status code will often not match the RPC status as seen by the client // application, and therefore, should not be relied upon for this purpose. func SendHeader(ctx context.Context, md metadata.MD) error { … } // SetSendCompressor sets a compressor for outbound messages from the server. // It must not be called after any event that causes headers to be sent // (see ServerStream.SetHeader for the complete list). Provided compressor is // used when below conditions are met: // // - compressor is registered via encoding.RegisterCompressor // - compressor name must exist in the client advertised compressor names // sent in grpc-accept-encoding header. Use ClientSupportedCompressors to // get client supported compressor names. // // The context provided must be the context passed to the server's handler. // It must be noted that compressor name encoding.Identity disables the // outbound compression. // By default, server messages will be sent using the same compressor with // which request messages were sent. // // It is not safe to call SetSendCompressor concurrently with SendHeader and // SendMsg. // // # Experimental // // Notice: This function is EXPERIMENTAL and may be changed or removed in a // later release. func SetSendCompressor(ctx context.Context, name string) error { … } // ClientSupportedCompressors returns compressor names advertised by the client // via grpc-accept-encoding header. // // The context provided must be the context passed to the server's handler. // // # Experimental // // Notice: This function is EXPERIMENTAL and may be changed or removed in a // later release. func ClientSupportedCompressors(ctx context.Context) ([]string, error) { … } // SetTrailer sets the trailer metadata that will be sent when an RPC returns. // When called more than once, all the provided metadata will be merged. // // The error returned is compatible with the status package. However, the // status code will often not match the RPC status as seen by the client // application, and therefore, should not be relied upon for this purpose. func SetTrailer(ctx context.Context, md metadata.MD) error { … } // Method returns the method string for the server context. The returned // string is in the format of "/service/method". func Method(ctx context.Context) (string, bool) { … } // validateSendCompressor returns an error when given compressor name cannot be // handled by the server or the client based on the advertised compressors. func validateSendCompressor(name string, clientCompressors []string) error { … } type atomicSemaphore … func (q *atomicSemaphore) acquire() { … } func (q *atomicSemaphore) release() { … } func newHandlerQuota(n uint32) *atomicSemaphore { … }