tornado.queues – Queues for coroutines

New in version 4.2.

Classes

Queue

class tornado.queues.Queue(maxsize=0)[source]

Coordinate producer and consumer coroutines.

If maxsize is 0 (the default) the queue size is unbounded.

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

@gen.coroutine
def producer():
    for item in range(5):
        yield q.put(item)
        print('Put %s' % item)

@gen.coroutine
def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    yield producer()     # Wait for producer to put all tasks.
    yield q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

In Python 3.5, Queue implements the async iterator protocol, so consumer() could be rewritten as:

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

Changed in version 4.3: Added async for support in Python 3.5.

maxsize

Number of items allowed in the queue.

qsize()[source]

Number of items in the queue.

put(item, timeout=None)[source]

Put an item into the queue, perhaps waiting until there is room.

Returns a Future, which raises tornado.gen.TimeoutError after a timeout.

put_nowait(item)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

get(timeout=None)[source]

Remove and return an item from the queue.

Returns a Future which resolves once an item is available, or raises tornado.gen.TimeoutError after a timeout.

get_nowait()[source]

Remove and return an item from the queue without blocking.

Return an item if one is immediately available, else raise QueueEmpty.

task_done()[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get used to fetch a task, a subsequent call to task_done tells the queue that the processing on the task is complete.

If a join is blocking, it resumes when all items have been processed; that is, when every put is matched by a task_done.

Raises ValueError if called more times than put.

join(timeout=None)[source]

Block until all items in the queue are processed.

Returns a Future, which raises tornado.gen.TimeoutError after a timeout.

PriorityQueue

class tornado.queues.PriorityQueue(maxsize=0)[source]

A Queue that retrieves entries in priority order, lowest first.

Entries are typically tuples like (priority number, data).

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize=0)[source]

A Queue that retrieves the most recently put items first.

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
1
2
3

Exceptions

QueueEmpty

exception tornado.queues.QueueEmpty[source]

Raised by Queue.get_nowait when the queue has no items.

QueueFull

exception tornado.queues.QueueFull[source]

Raised by Queue.put_nowait when a queue is at its maximum size.