◐ Shell
clean mode source ↗

bpo-32604: [_xxsubinterpreters] Add channel_send_wait(). by ericsnowcurrently · Pull Request #19829 · python/cpython

Expand Up @@ -3,6 +3,7 @@ /* low-level access to interpreter primitives */
#include "Python.h" #include "pythread.h" #include "frameobject.h" #include "interpreteridobject.h"
Expand Down Expand Up @@ -276,6 +277,149 @@ _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass) } }
/* locks */
typedef struct _lockobj { PyObject_HEAD PyThread_type_lock lock; PyInterpreterState *owner; int done; } _lockobj;
static int _lockobj_init(_lockobj *lock) { lock->lock = PyThread_allocate_lock(); if (lock->lock == NULL) { return -1; } lock->done = 0; lock->owner = _get_current(); return 0; }
static void _lockobj_dealloc(_lockobj *lock) { PyThread_free_lock(lock->lock); PyObject_Del(lock); }
// This is cross-interpreter safe. static int _lockobj_acquire(_lockobj *lock) { // Do not wait. return PyThread_acquire_lock(lock->lock, 0); }
// This is cross-interpreter safe. static void _lockobj_release(_lockobj *lock) { PyThread_release_lock(lock->lock); }
static PyObject * _lockobj_call(PyObject *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = {"timeout", NULL}; PY_TIMEOUT_T timeout = _PyThread_TIMEOUT_NOT_SET; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O&:__call__", kwlist, _PyThread_timeout_arg_converter, &timeout)) { return NULL; } _lockobj *lock = (_lockobj *)self;
if (lock->done) { Py_RETURN_TRUE; }
// Wait for the lock to be released. _PyTime_t end = timeout > 0 ? _PyTime_GetMonotonicClock() + timeout : 0; PyLockStatus r = PY_LOCK_FAILURE; do { _PyTime_t microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);
/* first a simple non-blocking try without releasing the GIL */ r = PyThread_acquire_lock_timed(lock->lock, 0, 0); if (r == PY_LOCK_FAILURE && microseconds != 0) { Py_BEGIN_ALLOW_THREADS r = PyThread_acquire_lock_timed(lock, microseconds, 0); Py_END_ALLOW_THREADS }
if (r == PY_LOCK_INTR) { /* Run signal handlers if we were interrupted. Propagate * exceptions from signal handlers, such as KeyboardInterrupt, by * passing up PY_LOCK_INTR. */ if (Py_MakePendingCalls() < 0) { return NULL; }
/* If we're using a timeout, recompute the timeout after processing * signals, since those can take time. */ if (timeout > 0) { timeout = end - _PyTime_GetMonotonicClock();
/* Check for negative values, since those mean block forever. */ if (timeout < 0) { r = PY_LOCK_FAILURE; } } } } while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */ if (r == PY_LOCK_FAILURE) { Py_RETURN_FALSE; }
// Success! _lockobj_release(lock); lock->done = 1; Py_RETURN_TRUE; }
static PyTypeObject _lockobjtype = { PyVarObject_HEAD_INIT(&PyType_Type, 0) .tp_name = "_xxsubinterpreters.lock", .tp_doc = PyDoc_STR("a basic waitable wrapper around a mutex"), .tp_flags = Py_TPFLAGS_DEFAULT, .tp_basicsize = sizeof(_lockobj), // functionality .tp_new = NULL, // It cannot be instantiated from Python code. .tp_dealloc = (destructor)_lockobj_dealloc, .tp_call = _lockobj_call, };
static _lockobj * _lockobj_new(void) { _lockobj *lock = PyObject_New(_lockobj, &_lockobjtype); if (lock == NULL) { return NULL; } if (_lockobj_init(lock) != 0) { PyMem_Free(lock); return NULL; } return lock; }
static void _lockobj_free(_lockobj *lock) { _lockobj_release(lock); if (lock->owner == NULL || lock->owner == _get_current()) { Py_DECREF(lock); } else { int res = _Py_DECREF_in_interpreter(lock->owner, (PyObject *)lock); assert(res == 0); } }

/* channel-specific code ****************************************************/
Expand Down Expand Up @@ -353,6 +497,7 @@ struct _channelitem;
typedef struct _channelitem { _PyCrossInterpreterData *data; _lockobj *recvlock; struct _channelitem *next; } _channelitem;
Expand All @@ -365,6 +510,7 @@ _channelitem_new(void) return NULL; } item->data = NULL; item->recvlock = NULL; item->next = NULL; return item; } Expand All @@ -377,6 +523,9 @@ _channelitem_clear(_channelitem *item) PyMem_Free(item->data); item->data = NULL; } if (item->recvlock != NULL) { _lockobj_free(item->recvlock); } item->next = NULL; }
Expand All @@ -402,6 +551,7 @@ _channelitem_popped(_channelitem *item) { _PyCrossInterpreterData *data = item->data; item->data = NULL; // The lock (if any) is released here: _channelitem_free(item); return data; } Expand Down Expand Up @@ -443,13 +593,15 @@ _channelqueue_free(_channelqueue *queue) }
static int _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data) _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data, _lockobj *recvlock) { _channelitem *item = _channelitem_new(); if (item == NULL) { return -1; } item->data = data; item->recvlock = recvlock;
queue->count += 1; if (queue->first == NULL) { Expand Down Expand Up @@ -761,7 +913,7 @@ _channel_free(_PyChannelState *chan)
static int _channel_add(_PyChannelState *chan, int64_t interp, _PyCrossInterpreterData *data) _PyCrossInterpreterData *data, _lockobj *recvlock) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); Expand All @@ -774,7 +926,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, goto done; }
if (_channelqueue_put(chan->queue, data) != 0) { if (_channelqueue_put(chan->queue, data, recvlock) != 0) { goto done; }
Expand Down Expand Up @@ -1280,7 +1432,8 @@ _channel_destroy(_channels *channels, int64_t id) }
static int _channel_send(_channels *channels, int64_t id, PyObject *obj) _channel_send(_channels *channels, int64_t id, PyObject *obj, _lockobj *recvlock) { PyInterpreterState *interp = _get_current(); if (interp == NULL) { Expand Down Expand Up @@ -1314,7 +1467,8 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj) }
// Add the data to the channel. int res = _channel_add(chan, PyInterpreterState_GetID(interp), data); int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, recvlock); PyThread_release_lock(mutex); if (res != 0) { _PyCrossInterpreterData_Release(data); Expand Down Expand Up @@ -2412,7 +2566,7 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) return NULL; }
if (_channel_send(&_globals.channels, cid, obj) != 0) { if (_channel_send(&_globals.channels, cid, obj, NULL) != 0) { return NULL; } Py_RETURN_NONE; Expand All @@ -2423,6 +2577,43 @@ PyDoc_STRVAR(channel_send_doc, \n\ Add the object's data to the channel's queue.");
static PyObject * channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"cid", "obj", NULL}; int64_t cid; PyObject *obj; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist, channel_id_converter, &cid, &obj)) { return NULL; }
_lockobj *lock = _lockobj_new(); if (lock == NULL) { return NULL; } if (_lockobj_acquire(lock) != 0) { PyErr_SetString(PyExc_RuntimeError, "could not acquire lock"); _lockobj_dealloc(lock); return NULL; } if (_channel_send(&_globals.channels, cid, obj, lock) != 0) { _lockobj_dealloc(lock); return NULL; } Py_INCREF(lock); return (PyObject*)lock; }
PyDoc_STRVAR(channel_send_wait_doc, "channel_send_wait(cid, obj)\n\ \n\ Add the object's data to the channel's queue.\n\ \n\ The returned callable will block until the object is received.\n\ Note that it takes an optional 'timeout' arg like\n\ threading.Lock.acquire() does.");
static PyObject * channel_recv(PyObject *self, PyObject *args, PyObject *kwds) { Expand Down Expand Up @@ -2575,6 +2766,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc}, {"channel_send", (PyCFunction)(void(*)(void))channel_send, METH_VARARGS | METH_KEYWORDS, channel_send_doc}, {"channel_send_wait", (PyCFunction)(void(*)(void))channel_send_wait, METH_VARARGS | METH_KEYWORDS, channel_send_wait_doc}, {"channel_recv", (PyCFunction)(void(*)(void))channel_recv, METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, {"channel_close", (PyCFunction)(void(*)(void))channel_close, Expand Down Expand Up @@ -2618,6 +2811,9 @@ PyInit__xxsubinterpreters(void) if (PyType_Ready(&ChannelIDtype) != 0) { return NULL; } if (PyType_Ready(&_lockobjtype) != 0) { return NULL; }
/* Create the module */ PyObject *module = PyModule_Create(&interpretersmodule); Expand Down