Skip to content
Draft
11 changes: 3 additions & 8 deletions Lib/concurrent/interpreters/_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ def qsize(self):

def put(self, obj, block=True, timeout=None, *,
unbounditems=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.

Expand Down Expand Up @@ -226,7 +225,6 @@ def put(self, obj, block=True, timeout=None, *,
except QueueFull:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
else:
break

Expand All @@ -235,11 +233,9 @@ def put_nowait(self, obj, *, unbounditems=None):
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop)
_queues.put(self._id, obj, unboundop, False)

def get(self, block=True, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
):
def get(self, block=True, timeout=None):
"""Return the next object from the queue.

If "block" is true, this blocks while the queue is empty.
Expand All @@ -261,7 +257,6 @@ def get(self, block=True, timeout=None, *,
except QueueEmpty:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
else:
break
if unboundop is not None:
Expand All @@ -276,7 +271,7 @@ def get_nowait(self):
is the same as get().
"""
try:
obj, unboundop = _queues.get(self._id)
obj, unboundop = _queues.get(self._id, False)
except QueueEmpty:
raise # re-raise
if unboundop is not None:
Expand Down
33 changes: 27 additions & 6 deletions Modules/_interpqueuesmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "Python.h"
#include "pycore_crossinterp.h" // _PyXIData_t
#include "pycore_lock.h" // PyEvent

#define REGISTERS_HEAP_TYPES
#define HAS_FALLBACK
Expand Down Expand Up @@ -532,6 +533,8 @@ typedef struct _queue {
xidata_fallback_t fallback;
int unboundop;
} defaults;
PyEvent space_available;
PyEvent has_item;
} _queue;

static int
Expand All @@ -549,6 +552,8 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
.maxsize = maxsize,
},
.defaults = defaults,
.space_available = (PyEvent){1},
.has_item = (PyEvent){0}
};
return 0;
}
Expand Down Expand Up @@ -641,6 +646,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
maxsize = PY_SSIZE_T_MAX;
}
if (queue->items.count >= maxsize) {
queue->space_available = (PyEvent){0};
_queue_unlock(queue);
return ERR_QUEUE_FULL;
}
Expand All @@ -660,6 +666,8 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
}
queue->items.last = item;

queue->has_item = (PyEvent){1};

_queue_unlock(queue);
return 0;
}
Expand All @@ -675,6 +683,7 @@ _queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
assert(queue->items.count >= 0);
_queueitem *item = queue->items.first;
if (item == NULL) {
queue->has_item = (PyEvent){0};
_queue_unlock(queue);
return ERR_QUEUE_EMPTY;
}
Expand Down Expand Up @@ -1124,7 +1133,7 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
xidata_fallback_t fallback)
xidata_fallback_t fallback, int block)
{
PyThreadState *tstate = PyThreadState_Get();

Expand All @@ -1136,6 +1145,11 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
}
assert(queue != NULL);

// Wait for the queue to have space
if (block == 1) {
PyEvent_Wait(&queue->space_available);
}

// Convert the object to cross-interpreter data.
_PyXIData_t *xidata = _PyXIData_New();
if (xidata == NULL) {
Expand Down Expand Up @@ -1168,7 +1182,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid,
PyObject **res, int *p_unboundop)
PyObject **res, int *p_unboundop, int block)
{
int err;
*res = NULL;
Expand All @@ -1182,6 +1196,11 @@ queue_get(_queues *queues, int64_t qid,
// Past this point we are responsible for releasing the mutex.
assert(queue != NULL);

// Wait for the queue to have some value
if (block == 1) {
PyEvent_Wait(&queue->has_item);
}

// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
err = _queue_next(queue, &data, p_unboundop);
Expand Down Expand Up @@ -1613,13 +1632,14 @@ _interpqueues.put
obj: object
unboundop as unboundarg: int = -1
fallback as fallbackarg: int = -1
block: bool = True

Add the object's data to the queue.
[clinic start generated code]*/

static PyObject *
_interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
int unboundarg, int fallbackarg)
int unboundarg, int fallbackarg, int block)
/*[clinic end generated code: output=2e0b31c6eaec29c9 input=4906550ab5c73be3]*/
{
struct _queuedefaults defaults = {-1, -1};
Expand All @@ -1639,7 +1659,7 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
}

/* Queue up the object. */
int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback, block);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, module, qid)) {
return NULL;
Expand All @@ -1651,19 +1671,20 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
/*[clinic input]
_interpqueues.get
qid: qidarg
block: bool = True

Return the (object, unbound op) from the front of the queue.

If there is nothing to receive then raise QueueEmpty.
[clinic start generated code]*/

static PyObject *
_interpqueues_get_impl(PyObject *module, int64_t qid)
_interpqueues_get_impl(PyObject *module, int64_t qid, int block)
/*[clinic end generated code: output=b0988a0e29194f05 input=c5bccbc409ad0190]*/
{
PyObject *obj = NULL;
int unboundop = 0;
int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
int err = queue_get(&_globals.queues, qid, &obj, &unboundop, block);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, module, qid)) {
return NULL;
Expand Down
39 changes: 27 additions & 12 deletions Modules/clinic/_interpqueuesmodule.c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading