-
Notifications
You must be signed in to change notification settings - Fork 152
Refresh examples to ensure that they run in current version #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
3f6135c
f1b6660
860ec25
e4d708c
2eba95a
5ad905e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,28 +1,17 @@ | ||
| from streamz import Stream | ||
| import asyncio | ||
| from tornado.platform.asyncio import AsyncIOMainLoop | ||
| AsyncIOMainLoop().install() | ||
| from streamz import Stream | ||
|
|
||
|
|
||
| source = Stream() | ||
| source = Stream(asynchronous=True) | ||
| s = source.sliding_window(2).map(sum) | ||
| L = s.sink_to_list() # store result in a list | ||
|
|
||
| s.rate_limit(0.5).sink(source.emit) # pipe output back to input | ||
| s.rate_limit(1.0).sink(lambda x: print(L)) # print state of L every second | ||
|
|
||
| source.emit(0) # seed with initial values | ||
| source.emit(1) | ||
|
|
||
| L = s.sink_to_list() # store result in a list | ||
|
|
||
| def run_asyncio_loop(): | ||
| loop = asyncio.get_event_loop() | ||
| try: | ||
| loop.run_forever() | ||
| except KeyboardInterrupt: | ||
| pass | ||
| finally: | ||
| loop.close() | ||
| s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second | ||
| s.rate_limit('500ms').connect(source) # pipe output back to input | ||
|
|
||
| source.emit(1) # seed with initial value, does not block thread due to Future return | ||
|
|
||
| run_asyncio_loop() | ||
| try: | ||
| asyncio.get_event_loop().run_forever() | ||
| except (KeyboardInterrupt, asyncio.CancelledError): | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,13 @@ | ||
| from streamz import Stream | ||
| from tornado.ioloop import IOLoop | ||
|
|
||
| source = Stream() | ||
| s = source.sliding_window(2).map(sum) | ||
| L = s.sink_to_list() # store result in a list | ||
| L = s.sink_to_list() # store result in a list | ||
|
|
||
| s.rate_limit('500ms').sink(source.emit) # pipe output back to input | ||
| s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second | ||
| s.rate_limit('500ms').connect(source) # pipe output back to input | ||
|
|
||
| source.emit(0) # seed with initial values | ||
| source.emit(1) | ||
| try: | ||
| source.emit(1) # seed with initial value, blocks thread due to cycle in stream | ||
| except KeyboardInterrupt: | ||
| pass |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,12 +4,14 @@ | |
|
|
||
| source = Stream(asynchronous=True) | ||
| s = source.sliding_window(2).map(sum) | ||
| L = s.sink_to_list() # store result in a list | ||
| L = s.sink_to_list() # store result in a list | ||
|
|
||
| s.rate_limit('500ms').sink(source.emit) # pipe output back to input | ||
| s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second | ||
| s.rate_limit('500ms').connect(source) # pipe output back to input | ||
|
|
||
| source.emit(0) # seed with initial values | ||
| source.emit(1) | ||
| source.emit(1) # seed with initial value, does not block thread due to Future return | ||
|
|
||
| IOLoop.current().start() | ||
| try: | ||
| IOLoop.current().start() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should default to using asyncio?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to preserve the Tornado semantics of the example as they are still valid and there is obviously quite a bit of example material in the documentation using Tornado. |
||
| except KeyboardInterrupt: | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,4 +18,9 @@ | |
| ) | ||
|
|
||
| s.start() | ||
| time.sleep(600) | ||
|
|
||
| try: | ||
| while True: | ||
| time.sleep(600) | ||
| except KeyboardInterrupt: | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,26 +1,23 @@ | ||
| from __future__ import print_function | ||
|
|
||
| from time import sleep | ||
| import sys | ||
|
|
||
| from BeautifulSoup import BeautifulSoup # Python 2 only, sorry. | ||
| from urllib.parse import urlparse | ||
|
|
||
| import requests | ||
| from streamz import Stream | ||
| import toolz | ||
| import urlparse | ||
| from bs4 import BeautifulSoup | ||
|
|
||
| from streamz import Stream | ||
|
|
||
|
|
||
| def links_of_page((content, page)): | ||
| uri = urlparse.urlparse(page) | ||
| def links_of_page(content_page): | ||
| (content, page) = content_page | ||
| uri = urlparse(page) | ||
| domain = '%s://%s' % (uri.scheme, uri.netloc) | ||
| try: | ||
| soup = BeautifulSoup(content) | ||
| soup = BeautifulSoup(content, features="html.parser") | ||
| except: | ||
| return [] | ||
| else: | ||
| links = [link.get('href') for link in soup.findAll('a')] | ||
| links = [link.get('href') for link in soup.find_all('a')] | ||
| return [domain + link | ||
| for link in links | ||
| if link | ||
|
|
@@ -41,8 +38,8 @@ def topk_dict(d, k=10): | |
| .map(lambda x: x.content)) | ||
| links = (content.zip(pages) | ||
| .map(links_of_page) | ||
| .concat()) | ||
| links.sink(source.emit) | ||
| .flatten()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I saw the improvement to flatten(), but was concat() wrong, or did it disappear?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The concat and scan aliases for flatten and accumulate are still there but they do not show up in the documentation so I switched the examples to use flatten directly since it is documented. |
||
| links.connect(source) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is more right than sink(emit). |
||
|
|
||
| """ | ||
| from nltk.corpus import stopwords | ||
|
|
@@ -60,8 +57,7 @@ def topk_dict(d, k=10): | |
| """ | ||
|
|
||
| if len(sys.argv) > 1: | ||
| source.emit(sys.argv[1]) | ||
|
|
||
|
|
||
|
|
||
| # | ||
| try: | ||
| source.emit(sys.argv[1]) | ||
| except KeyboardInterrupt: | ||
| pass | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least one of these should be rewritten (or alternative version added) in an
aync deffunction as an example where the whole app is a single asyncio.run() call.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I did that for fib_asyncio.py but then it looked heavy-weight compared to the tornado variant and lost some of the ease of direct comparison.