Queue example - a concurrent web spider¶
Tornado’s tornado.queues module implements an asynchronous producer /
consumer pattern for coroutines, analogous to the pattern implemented for
threads by the Python standard library’s queue module.
A coroutine that yields Queue.get pauses until there is an item in the queue.
If the queue has a maximum size set, a coroutine that yields Queue.put pauses
until there is room for another item.
A Queue maintains a count of unfinished tasks, which begins at zero.
put increments the count; task_done decrements it.
In the web-spider example here, the queue begins containing only base_url. When
a worker fetches a page it parses the links and puts new ones in the queue,
then calls task_done to decrement the counter once. Eventually, a
worker fetches a page whose URLs have all been seen before, and there is also
no work left in the queue. Thus that worker’s call to task_done
decrements the counter to zero. The main coroutine, which is waiting for
join, is unpaused and finishes.
是不是所有的多线程的范式都可以转换为异步呢
import HTMLParser
import time
import urlparse
from datetime import timedelta
from tornado import httpclient, gen, ioloop, queues
##全局变量
base_url = 'http://www.tornadoweb.org/en/stable/'
concurrency = 10
@gen.coroutine
def get_links_from_url(url):
"""Download the page at `url` and parse it for links.
Returned links have had the fragment after `#` removed, and have been made
absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
'http://www.tornadoweb.org/en/stable/gen.html'.
"""
try:
response = yield httpclient.AsyncHTTPClient().fetch(url)
print('fetched %s' % url)
urls = [urlparse.urljoin(url, remove_fragment(new_url))
for new_url in get_links(response.body)]
except Exception as e:
print('Exception: %s %s' % (e, url))
raise gen.Return([])
raise gen.Return(urls)
def remove_fragment(url):
scheme, netloc, url, params, query, fragment = urlparse.urlparse(url)
return urlparse.urlunparse((scheme, netloc, url, params, query, ''))
###内部类
def get_links(html):
class URLSeeker(HTMLParser.HTMLParser):
def __init__(self):
HTMLParser.HTMLParser.__init__(self)
self.urls = []
def handle_starttag(self, tag, attrs):
href = dict(attrs).get('href')
if href and tag == 'a':
self.urls.append(href)
url_seeker = URLSeeker()
url_seeker.feed(html)
return url_seeker.urls
@gen.coroutine
def main():
q = queues.Queue()
start = time.time()
fetching, fetched = set(), set()
###线程执行函数
@gen.coroutine
def fetch_url():
current_url = yield q.get()
try:
if current_url in fetching:
return
print('fetching %s' % current_url)
fetching.add(current_url)
urls = yield get_links_from_url(current_url)
fetched.add(current_url)
for new_url in urls:
# Only follow links beneath the base URL
if new_url.startswith(base_url):
yield q.put(new_url)
###
finally:
q.task_done()
@gen.coroutine
def worker():
while True:
yield fetch_url()
q.put(base_url)
# Start workers, then wait for the work queue to be empty.
for _ in range(concurrency):
worker()
##等待队列结束
yield q.join(timeout=timedelta(seconds=300))
assert fetching == fetched
print('Done in %d seconds, fetched %s URLs.' % (
time.time() - start, len(fetched)))
if __name__ == '__main__':
import logging
logging.basicConfig()
io_loop = ioloop.IOLoop.current()
##启动一个异步的函数
io_loop.run_sync(main)
##其实那些worker还在 但是main退出了 over