Queue example - a concurrent web spider

Tornado’s tornado.queues module implements an asynchronous producer / consumer pattern for coroutines, analogous to the pattern implemented for threads by the Python standard library’s queue module.

A coroutine that yields Queue.get pauses until there is an item in the queue. If the queue has a maximum size set, a coroutine that yields Queue.put pauses until there is room for another item.

A Queue maintains a count of unfinished tasks, which begins at zero. put increments the count; task_done decrements it.

In the web-spider example here, the queue begins containing only base_url. When a worker fetches a page it parses the links and puts new ones in the queue, then calls task_done to decrement the counter once. Eventually, a worker fetches a page whose URLs have all been seen before, and there is also no work left in the queue. Thus that worker’s call to task_done decrements the counter to zero. The main coroutine, which is waiting for join, is unpaused and finishes.


import HTMLParser
import time
import urlparse
from datetime import timedelta

from tornado import httpclient, gen, ioloop, queues

base_url = 'http://www.tornadoweb.org/en/stable/'
concurrency = 10

def get_links_from_url(url):
    """Download the page at `url` and parse it for links.

    Returned links have had the fragment after `#` removed, and have been made
    absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
        response = yield httpclient.AsyncHTTPClient().fetch(url)
        print('fetched %s' % url)
        urls = [urlparse.urljoin(url, remove_fragment(new_url))
                for new_url in get_links(response.body)]
    except Exception as e:
        print('Exception: %s %s' % (e, url))
        raise gen.Return([])

    raise gen.Return(urls)

def remove_fragment(url):
    scheme, netloc, url, params, query, fragment = urlparse.urlparse(url)
    return urlparse.urlunparse((scheme, netloc, url, params, query, ''))

def get_links(html):
    class URLSeeker(HTMLParser.HTMLParser):
        def __init__(self):
            self.urls = []

        def handle_starttag(self, tag, attrs):
            href = dict(attrs).get('href')
            if href and tag == 'a':

    url_seeker = URLSeeker()
    return url_seeker.urls

def main():
    q = queues.Queue()
    start = time.time()
    fetching, fetched = set(), set()

    def fetch_url():
        current_url = yield q.get()
            if current_url in fetching:

            print('fetching %s' % current_url)
            urls = yield get_links_from_url(current_url)

            for new_url in urls:
                # Only follow links beneath the base URL
                if new_url.startswith(base_url):
                    yield q.put(new_url)


    def worker():
        while True:
            yield fetch_url()


    # Start workers, then wait for the work queue to be empty.
    for _ in range(concurrency):
    yield q.join(timeout=timedelta(seconds=300))
    assert fetching == fetched
    print('Done in %d seconds, fetched %s URLs.' % (
        time.time() - start, len(fetched)))

if __name__ == '__main__':
    import logging
    io_loop = ioloop.IOLoop.current()
    ##其实那些worker还在 但是main退出了 over