const writeDeadline … var _ … var _ … var _ … var streamType2streamID … const pingPeriod … const pingReadDeadline … type wsStreamExecutor … func NewWebSocketExecutor(config *restclient.Config, method, url string) (Executor, error) { … } // NewWebSocketExecutorForProtocols allows to execute commands via a WebSocket connection. func NewWebSocketExecutorForProtocols(config *restclient.Config, method, url string, protocols ...string) (Executor, error) { … } // Deprecated: use StreamWithContext instead to avoid possible resource leaks. // See https://github.com/kubernetes/kubernetes/pull/103177 for details. func (e *wsStreamExecutor) Stream(options StreamOptions) error { … } // StreamWithContext upgrades an HTTPRequest to a WebSocket connection, and starts the various // goroutines to implement the necessary streams over the connection. The "options" parameter // defines which streams are requested. Returns an error if one occurred. This method is NOT // safe to run concurrently with the same executor (because of the state stored in the upgrader). func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error { … } type wsStreamCreator … func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator { … } func (c *wsStreamCreator) getStream(id byte) *stream { … } func (c *wsStreamCreator) setStream(id byte, s *stream) error { … } // CreateStream uses id from passed headers to create a stream over "c.conn" connection. // Returns a Stream structure or nil and an error if one occurred. func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) { … } // readDemuxLoop is the lock-free reading processor for this endpoint of the websocket // connection. This loop reads the connection, and demultiplexes the data // into one of the individual stream pipes (by checking the stream id). This // loop can *not* be run concurrently, because there can only be one websocket // connection reader at a time (a read mutex would provide no benefit). func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, deadline time.Duration) { … } // closeAllStreamReaders closes readers in all streams. // This unblocks all stream.Read() calls, and keeps any future streams from being created. func (c *wsStreamCreator) closeAllStreamReaders(err error) { … } type stream … func (s *stream) Read(p []byte) (n int, err error) { … } // Write writes directly to the underlying WebSocket connection. func (s *stream) Write(p []byte) (n int, err error) { … } // Close half-closes the stream, indicating this side is finished with the stream. func (s *stream) Close() error { … } func (s *stream) Reset() error { … } func (s *stream) Headers() http.Header { … } func (s *stream) Identifier() uint32 { … } type heartbeat … // newHeartbeat creates heartbeat structure encapsulating fields necessary to // run the websocket connection ping/pong mechanism and sets up handlers on // the websocket connection. func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat { … } // setMessage is optional data sent with "ping" heartbeat. According to the websocket RFC // this data sent with "ping" message should be returned in "pong" message. func (h *heartbeat) setMessage(msg string) { … } // start the heartbeat by setting up necesssary handlers and looping by sending "ping" // message every "period" until the "closer" channel is closed. func (h *heartbeat) start() { … }