cpython/Modules/_interpchannelsmodule.c

/* interpreters module */
/* low-level access to interpreter primitives */

#ifndef Py_BUILD_CORE_BUILTIN
#define Py_BUILD_CORE_MODULE
#endif

#include "Python.h"
#include "pycore_crossinterp.h"   // struct _xid
#include "pycore_interp.h"        // _PyInterpreterState_LookUpID()
#include "pycore_pystate.h"       // _PyInterpreterState_GetIDObject()

#ifdef MS_WINDOWS
#define WIN32_LEAN_AND_MEAN
#include <windows.h>        // SwitchToThread()
#elif defined(HAVE_SCHED_H)
#include <sched.h>          // sched_yield()
#endif

#define REGISTERS_HEAP_TYPES
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
#undef REGISTERS_HEAP_TYPES


/*
This module has the following process-global state:

_globals (static struct globals):
    module_count (int)
    channels (struct _channels):
        numopen (int64_t)
        next_id; (int64_t)
        mutex (PyThread_type_lock)
        head (linked list of struct _channelref *):
            cid (int64_t)
            objcount (Py_ssize_t)
            next (struct _channelref *):
                ...
            chan (struct _channel *):
                open (int)
                mutex (PyThread_type_lock)
                closing (struct _channel_closing *):
                    ref (struct _channelref *):
                        ...
                ends (struct _channelends *):
                    numsendopen (int64_t)
                    numrecvopen (int64_t)
                    send (struct _channelend *):
                        interpid (int64_t)
                        open (int)
                        next (struct _channelend *)
                    recv (struct _channelend *):
                        ...
                queue (struct _channelqueue *):
                    count (int64_t)
                    first (struct _channelitem *):
                        next (struct _channelitem *):
                            ...
                        data (_PyCrossInterpreterData *):
                            data (void *)
                            obj (PyObject *)
                            interpid (int64_t)
                            new_object (xid_newobjectfunc)
                            free (xid_freefunc)
                    last (struct _channelitem *):
                        ...

The above state includes the following allocations by the module:

* 1 top-level mutex (to protect the rest of the state)
* for each channel:
   * 1 struct _channelref
   * 1 struct _channel
   * 0-1 struct _channel_closing
   * 1 struct _channelends
   * 2 struct _channelend
   * 1 struct _channelqueue
* for each item in each channel:
   * 1 struct _channelitem
   * 1 _PyCrossInterpreterData

The only objects in that global state are the references held by each
channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
API..  The module does not create any objects that are shared globally.
*/

#define MODULE_NAME
#define MODULE_NAME_STR
#define MODINIT_FUNC_NAME


#define GLOBAL_MALLOC(TYPE)
#define GLOBAL_FREE(VAR)


#define XID_IGNORE_EXC
#define XID_FREE

static int
_release_xid_data(_PyCrossInterpreterData *data, int flags)
{}


static PyInterpreterState *
_get_current_interp(void)
{}

static PyObject *
_get_current_module(void)
{}

static PyObject *
get_module_from_owned_type(PyTypeObject *cls)
{}

static struct PyModuleDef moduledef;

static PyObject *
get_module_from_type(PyTypeObject *cls)
{}

static PyObject *
add_new_exception(PyObject *mod, const char *name, PyObject *base)
{}

#define ADD_NEW_EXCEPTION(MOD, NAME, BASE)

static int
wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout)
{}


/* module state *************************************************************/

module_state;

static inline module_state *
get_module_state(PyObject *mod)
{}

static module_state *
_get_current_module_state(void)
{}

static int
traverse_module_state(module_state *state, visitproc visit, void *arg)
{}

static void
clear_xid_types(module_state *state)
{}

static int
clear_module_state(module_state *state)
{}


/* channel-specific code ****************************************************/

#define CHANNEL_SEND
#define CHANNEL_BOTH
#define CHANNEL_RECV


/* channel errors */

#define ERR_CHANNEL_NOT_FOUND
#define ERR_CHANNEL_CLOSED
#define ERR_CHANNEL_INTERP_CLOSED
#define ERR_CHANNEL_EMPTY
#define ERR_CHANNEL_NOT_EMPTY
#define ERR_CHANNEL_MUTEX_INIT
#define ERR_CHANNELS_MUTEX_INIT
#define ERR_NO_NEXT_CHANNEL_ID
#define ERR_CHANNEL_CLOSED_WAITING

static int
exceptions_init(PyObject *mod)
{}

static int
handle_channel_error(int err, PyObject *mod, int64_t cid)
{}


/* the channel queue */

_channelitem_id_t;

_waiting_t;

static int
_waiting_init(_waiting_t *waiting)
{}

static void
_waiting_clear(_waiting_t *waiting)
{}

static _channelitem_id_t
_waiting_get_itemid(_waiting_t *waiting)
{}

static void
_waiting_acquire(_waiting_t *waiting)
{}

static void
_waiting_release(_waiting_t *waiting, int received)
{}

static void
_waiting_finish_releasing(_waiting_t *waiting)
{}

struct _channelitem;

_channelitem;

static inline _channelitem_id_t
_channelitem_ID(_channelitem *item)
{}

static void
_channelitem_init(_channelitem *item,
                  int64_t interpid, _PyCrossInterpreterData *data,
                  _waiting_t *waiting, int unboundop)
{}

static void
_channelitem_clear_data(_channelitem *item, int removed)
{}

static void
_channelitem_clear(_channelitem *item)
{}

static _channelitem *
_channelitem_new(int64_t interpid, _PyCrossInterpreterData *data,
                 _waiting_t *waiting, int unboundop)
{}

static void
_channelitem_free(_channelitem *item)
{}

static void
_channelitem_free_all(_channelitem *item)
{}

static void
_channelitem_popped(_channelitem *item,
                    _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
                    int *p_unboundop)
{}

static int
_channelitem_clear_interpreter(_channelitem *item)
{}


_channelqueue;

static _channelqueue *
_channelqueue_new(void)
{}

static void
_channelqueue_clear(_channelqueue *queue)
{}

static void
_channelqueue_free(_channelqueue *queue)
{}

static int
_channelqueue_put(_channelqueue *queue,
                  int64_t interpid, _PyCrossInterpreterData *data,
                  _waiting_t *waiting, int unboundop)
{}

static int
_channelqueue_get(_channelqueue *queue,
                  _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
                  int *p_unboundop)
{}

static int
_channelqueue_find(_channelqueue *queue, _channelitem_id_t itemid,
                   _channelitem **p_item, _channelitem **p_prev)
{}

static void
_channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
                     _PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
{}

static void
_channelqueue_clear_interpreter(_channelqueue *queue, int64_t interpid)
{}


/* channel-interpreter associations */

struct _channelend;

_channelend;

static _channelend *
_channelend_new(int64_t interpid)
{}

static void
_channelend_free(_channelend *end)
{}

static void
_channelend_free_all(_channelend *end)
{}

static _channelend *
_channelend_find(_channelend *first, int64_t interpid, _channelend **pprev)
{}

_channelends;

static _channelends *
_channelends_new(void)
{}

static void
_channelends_clear(_channelends *ends)
{}

static void
_channelends_free(_channelends *ends)
{}

static _channelend *
_channelends_add(_channelends *ends, _channelend *prev, int64_t interpid,
                 int send)
{}

static int
_channelends_associate(_channelends *ends, int64_t interpid, int send)
{}

static int
_channelends_is_open(_channelends *ends)
{}

static void
_channelends_release_end(_channelends *ends, _channelend *end, int send)
{}

static int
_channelends_release_interpreter(_channelends *ends, int64_t interpid, int which)
{}

static void
_channelends_release_all(_channelends *ends, int which, int force)
{}

static void
_channelends_clear_interpreter(_channelends *ends, int64_t interpid)
{}


/* each channel's state */

struct _channel;
struct _channel_closing;
static void _channel_clear_closing(struct _channel *);
static void _channel_finish_closing(struct _channel *);

_channel_state;

static _channel_state *
_channel_new(PyThread_type_lock mutex, int unboundop)
{}

static void
_channel_free(_channel_state *chan)
{}

static int
_channel_add(_channel_state *chan, int64_t interpid,
             _PyCrossInterpreterData *data, _waiting_t *waiting,
             int unboundop)
{}

static int
_channel_next(_channel_state *chan, int64_t interpid,
              _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
              int *p_unboundop)
{}

static void
_channel_remove(_channel_state *chan, _channelitem_id_t itemid)
{}

static int
_channel_release_interpreter(_channel_state *chan, int64_t interpid, int end)
{}

static int
_channel_release_all(_channel_state *chan, int end, int force)
{}

static void
_channel_clear_interpreter(_channel_state *chan, int64_t interpid)
{}


/* the set of channels */

struct _channelref;

_channelref;

static _channelref *
_channelref_new(int64_t cid, _channel_state *chan)
{}

//static void
//_channelref_clear(_channelref *ref)
//{
//    ref->cid = -1;
//    ref->chan = NULL;
//    ref->next = NULL;
//    ref->objcount = 0;
//}

static void
_channelref_free(_channelref *ref)
{}

static _channelref *
_channelref_find(_channelref *first, int64_t cid, _channelref **pprev)
{}


_channels;

static void
_channels_init(_channels *channels, PyThread_type_lock mutex)
{}

static void
_channels_fini(_channels *channels)
{}

static int64_t
_channels_next_id(_channels *channels)  // needs lock
{}

static int
_channels_lookup(_channels *channels, int64_t cid, PyThread_type_lock *pmutex,
                 _channel_state **res)
{}

static int64_t
_channels_add(_channels *channels, _channel_state *chan)
{}

/* forward */
static int _channel_set_closing(_channelref *, PyThread_type_lock);

static int
_channels_close(_channels *channels, int64_t cid, _channel_state **pchan,
                int end, int force)
{}

static void
_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
                     _channel_state **pchan)
{}

static int
_channels_remove(_channels *channels, int64_t cid, _channel_state **pchan)
{}

static int
_channels_add_id_object(_channels *channels, int64_t cid)
{}

static void
_channels_release_cid_object(_channels *channels, int64_t cid)
{}

struct channel_id_and_info {};

static struct channel_id_and_info *
_channels_list_all(_channels *channels, int64_t *count)
{}

static void
_channels_clear_interpreter(_channels *channels, int64_t interpid)
{}


/* support for closing non-empty channels */

struct _channel_closing {};

static int
_channel_set_closing(_channelref *ref, PyThread_type_lock mutex) {}

static void
_channel_clear_closing(_channel_state *chan) {}

static void
_channel_finish_closing(_channel_state *chan) {}


/* "high"-level channel-related functions */

// Create a new channel.
static int64_t
channel_create(_channels *channels, int unboundop)
{}

// Completely destroy the channel.
static int
channel_destroy(_channels *channels, int64_t cid)
{}

// Push an object onto the channel.
// The current interpreter gets associated with the send end of the channel.
// Optionally request to be notified when it is received.
static int
channel_send(_channels *channels, int64_t cid, PyObject *obj,
             _waiting_t *waiting, int unboundop)
{}

// Basically, un-send an object.
static void
channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
{}

// Like channel_send(), but strictly wait for the object to be received.
static int
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
                  int unboundop, PY_TIMEOUT_T timeout)
{}

// Pop the next object off the channel.  Fail if empty.
// The current interpreter gets associated with the recv end of the channel.
// XXX Support a "wait" mutex?
static int
channel_recv(_channels *channels, int64_t cid, PyObject **res, int *p_unboundop)
{}

// Disallow send/recv for the current interpreter.
// The channel is marked as closed if no other interpreters
// are currently associated.
static int
channel_release(_channels *channels, int64_t cid, int send, int recv)
{}

// Close the channel (for all interpreters).  Fail if it's already closed.
// Close immediately if it's empty.  Otherwise, disallow sending and
// finally close once empty.  Optionally, immediately clear and close it.
static int
channel_close(_channels *channels, int64_t cid, int end, int force)
{}

// Return true if the identified interpreter is associated
// with the given end of the channel.
static int
channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
                       int send)
{}

static int
_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
{}


/* channel info */

struct channel_info {};

static int
_channel_get_info(_channels *channels, int64_t cid, struct channel_info *info)
{}

PyDoc_STRVAR(channel_info_doc,
"ChannelInfo\n\
\n\
A named tuple of a channel's state.");

static PyStructSequence_Field channel_info_fields[] =;

static PyStructSequence_Desc channel_info_desc =;

static PyObject *
new_channel_info(PyObject *mod, struct channel_info *info)
{}


/* ChannelID class */

channelid;

struct channel_id_converter_data {};

static int
channel_id_converter(PyObject *arg, void *ptr)
{}

static int
newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
             int force, int resolve, channelid **res)
{}

static _channels * _global_channels(void);

static PyObject *
_channelid_new(PyObject *mod, PyTypeObject *cls,
               PyObject *args, PyObject *kwds)
{}

static void
channelid_dealloc(PyObject *self)
{}

static PyObject *
channelid_repr(PyObject *self)
{}

static PyObject *
channelid_str(PyObject *self)
{}

static PyObject *
channelid_int(PyObject *self)
{}

static Py_hash_t
channelid_hash(PyObject *self)
{}

static PyObject *
channelid_richcompare(PyObject *self, PyObject *other, int op)
{}

static PyTypeObject * _get_current_channelend_type(int end);

static PyObject *
_channelobj_from_cidobj(PyObject *cidobj, int end)
{}

struct _channelid_xid {};

static PyObject *
_channelid_from_xid(_PyCrossInterpreterData *data)
{}

static int
_channelid_shared(PyThreadState *tstate, PyObject *obj,
                  _PyCrossInterpreterData *data)
{}

static PyObject *
channelid_end(PyObject *self, void *end)
{}

static int _channelid_end_send =;
static int _channelid_end_recv =;

static PyGetSetDef channelid_getsets[] =;

PyDoc_STRVAR(channelid_doc,
"A channel ID identifies a channel and may be used as an int.");

static PyType_Slot channelid_typeslots[] =;

static PyType_Spec channelid_typespec =;

static PyTypeObject *
add_channelid_type(PyObject *mod)
{}


/* SendChannel and RecvChannel classes */

// XXX Use a new __xid__ protocol instead?

static PyTypeObject *
_get_current_channelend_type(int end)
{}

static PyObject *
_channelend_from_xid(_PyCrossInterpreterData *data)
{}

static int
_channelend_shared(PyThreadState *tstate, PyObject *obj,
                    _PyCrossInterpreterData *data)
{}

static int
set_channelend_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv)
{}


/* module level code ********************************************************/

/* globals is the process-global state for the module.  It holds all
   the data that we need to share between interpreters, so it cannot
   hold PyObject values. */
static struct globals {} _globals =;

static int
_globals_init(void)
{}

static void
_globals_fini(void)
{}

static _channels *
_global_channels(void) {}


static void
clear_interpreter(void *data)
{}


static PyObject *
channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_create_doc,
"channel_create(unboundop) -> cid\n\
\n\
Create a new cross-interpreter channel and return a unique generated ID.");

static PyObject *
channelsmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_destroy_doc,
"channel_destroy(cid)\n\
\n\
Close and finalize the channel.  Afterward attempts to use the channel\n\
will behave as though it never existed.");

static PyObject *
channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{}

PyDoc_STRVAR(channelsmod_list_all_doc,
"channel_list_all() -> [cid]\n\
\n\
Return the list of all IDs for active channels.");

static PyObject *
channelsmod_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_list_interpreters_doc,
"channel_list_interpreters(cid, *, send) -> [id]\n\
\n\
Return the list of all interpreter IDs associated with an end of the channel.\n\
\n\
The 'send' argument should be a boolean indicating whether to use the send or\n\
receive end.");


static PyObject *
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_send_doc,
"channel_send(cid, obj, *, blocking=True, timeout=None)\n\
\n\
Add the object's data to the channel's queue.\n\
By default this waits for the object to be received.");

static PyObject *
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_send_buffer_doc,
"channel_send_buffer(cid, obj, *, blocking=True, timeout=None)\n\
\n\
Add the object's buffer to the channel's queue.\n\
By default this waits for the object to be received.");

static PyObject *
channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_recv_doc,
"channel_recv(cid, [default]) -> (obj, unboundop)\n\
\n\
Return a new object from the data at the front of the channel's queue.\n\
\n\
If there is nothing to receive then raise ChannelEmptyError, unless\n\
a default value is provided.  In that case return it.");

static PyObject *
channelsmod_close(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_close_doc,
"channel_close(cid, *, send=None, recv=None, force=False)\n\
\n\
Close the channel for all interpreters.\n\
\n\
If the channel is empty then the keyword args are ignored and both\n\
ends are immediately closed.  Otherwise, if 'force' is True then\n\
all queued items are released and both ends are immediately\n\
closed.\n\
\n\
If the channel is not empty *and* 'force' is False then following\n\
happens:\n\
\n\
 * recv is True (regardless of send):\n\
   - raise ChannelNotEmptyError\n\
 * recv is None and send is None:\n\
   - raise ChannelNotEmptyError\n\
 * send is True and recv is not True:\n\
   - fully close the 'send' end\n\
   - close the 'recv' end to interpreters not already receiving\n\
   - fully close it once empty\n\
\n\
Closing an already closed channel results in a ChannelClosedError.\n\
\n\
Once the channel's ID has no more ref counts in any interpreter\n\
the channel will be destroyed.");

static PyObject *
channelsmod_release(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_release_doc,
"channel_release(cid, *, send=None, recv=None, force=True)\n\
\n\
Close the channel for the current interpreter.  'send' and 'recv'\n\
(bool) may be used to indicate the ends to close.  By default both\n\
ends are closed.  Closing an already closed end is a noop.");

static PyObject *
channelsmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_get_count_doc,
"get_count(cid)\n\
\n\
Return the number of items in the channel.");

static PyObject *
channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_get_info_doc,
"get_info(cid)\n\
\n\
Return details about the channel.");

static PyObject *
channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds)
{}

PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,
"get_channel_defaults(cid)\n\
\n\
Return the channel's default values, set when it was created.");

static PyObject *
channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
{}

static PyObject *
channelsmod__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
{}

static PyMethodDef module_functions[] =;


/* initialization function */

PyDoc_STRVAR(module_doc,
"This module provides primitive operations to manage Python interpreters.\n\
The 'interpreters' module provides a more convenient interface.");

static int
module_exec(PyObject *mod)
{}

static struct PyModuleDef_Slot module_slots[] =;

static int
module_traverse(PyObject *mod, visitproc visit, void *arg)
{}

static int
module_clear(PyObject *mod)
{}

static void
module_free(void *mod)
{}

static struct PyModuleDef moduledef =;

PyMODINIT_FUNC
MODINIT_FUNC_NAME(void)
{}