Github: https://github.com/nerevu/riko
riko:Yahoo!をモデルにしたストリーム処理エンジン パイプ
インデックス
はじめに | 要件 | ワード数 | モチベーション | 使い方 | インストール | デザイン原則 | スクリプト | コマンドラインインターフェイス | 貢献する | クレジット | 詳細情報 | プロジェクトの構成 | ライセンス
前書き
rikoは、構造化データのstreams
を解析して処理するための純粋なPython ライブラリです。 riko
は同期 APIと非同期 APIを備え、 並列実行をサポートしており、RSSフィードの処理に適しています[1] 。 riko
flows
、 flows
を実行するためのコマンドラインインターフェイス 、つまりストリームプロセッサをworkflows
。
riko
、あなたは
- csv / xml / json / htmlファイルを読む
- モジュラーパイプを介してテキストとデータベースの
flows
を作成する - RSS / Atomフィードの解析、抽出、処理
- すばらしいマッシュアップ[2] 、API、マップを作成する
- CPU /プロセッサまたはスレッドを介して並列処理を実行する
- さらに多くの…
ノート
[1] | 本当に簡単なシンジケーション |
[2] | マッシュアップ(Webアプリケーションハイブリッド) |
要件
riko
はテスト済みで、Python 2.7,3.5、および3.6で動作することが知られています。 PyPy2 5.8.0、およびPyPy3 5.8.0。
オプションの依存関係
特徴 | 依存 | インストール |
---|---|---|
非同期API | ツイスト | pip install riko[async] |
XML解析の高速化 | lxml [3] | pip install riko[xml] |
フィード解析の高速化 | speedparser [4] | pip install riko[xml] |
ノート
[3] | lxml が存在しない場合、 lxml は組み込みのPython xmlパーサにデフォルト設定されます |
[4] | speedparser が存在しない場合、 speedparser はデフォルトのfeedparser なります |
ワード数
この例では、いくつかのパイプを使用してWebページ上の単語を数えます。
>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> from riko.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. the `detag` option will strip all html tags from the result
>>> # 2. fetch the text contained inside the 'body' tag of the hackernews
>>> # homepage
>>> # 3. replace newlines with spaces and assign the result to 'content'
>>> # 4. tokenize the resulting text using whitespace as the delimeter
>>> # 5. count the number of times each token appears
>>> # 6. obtain the raw stream
>>> # 7. extract the first word and its count
>>> # 8. extract the second word and its count
>>> # 9. extract the third word and its count
>>> url = 'https://news.ycombinator.com/'
>>> fetch_conf = {
... 'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1
>>>
>>> replace_conf = {
... 'rule': [
... {'find': '\r\n', 'replace': ' '},
... {'find': '\n', 'replace': ' '}]}
>>>
>>> flow = (
... SyncPipe('fetchpage', conf=fetch_conf) # 2
... .strreplace(conf=replace_conf, assign='content') # 3
... .tokenizer(conf={'delimiter': ' '}, emit=True) # 4
... .count(conf={'count_key': 'content'})) # 5
>>>
>>> stream = flow.output # 6
>>> next(stream) # 7
{"'sad": 1}
>>> next(stream) # 8
{'(': 28}
>>> next(stream) # 9
{'(1999)': 1}
動機
なぜ私はrikoを建てた
Yahoo! Pipes [5]は、ユーザーフレンドリーなWebアプリケーションでした。
Web上のコンテンツの集約、操作、マッシュアップ
カスタムパイプを作成したいのですが、私はpipe2pyに出くわしました。 Pythonコードに変換します。 pipe2py
は当時私のニーズに合っていましたが、維持されず、非同期または並列処理が欠けていました。
pipe2py
はpipe2py
の欠点に対処していますが、Yahoo! jsonワークフローをパイプします。 rikoには、あなたがプログラマチックに多くのタスクを実行できるようにする、「40の組み込みの」モジュール、別名pipes
が含まれています。Yahoo! 許可されたパイプ。
なぜあなたはrikoを使うべきですか
riko
は、Huginn、Flink、Spark、Storm [6]など、他のストリーム処理アプリケーションとの多くの利点/相違点を提供します。 すなわち:
- 小さなフットプリント(CPUとメモリ使用量)
- ネイティブRSS / Atomサポート
- 簡単なインストールと使い方
- pypyサポート付きの純粋なpythonライブラリ
-
streams
フィルタリング、並べ替え、変更を行う組み込みのモジュール式pipes
その後のriko riko
トレードオフは次のとおりです。
- 分散されていない(サーバーのクラスタ上で実行可能)
-
flows
を作成するためのGUIはありません - 新しいデータの
streams
を継続的に監視しない - 特定のイベントに反応することはできません
- ストリームは単一のコンシューマのみをサポートするため、イテレータ(プル)に基づいています[7]
次の表は、これらの観察結果をまとめたものです。
としょうかん | ストリームタイプ | フットプリント | RSS | 単純な[8] | 非同期 | 平行 | CEP [9] | 配布された |
---|---|---|---|---|---|---|---|---|
リコ | 引く | 小さい | √ | √ | √ | √ | ||
pipe2py | 引く | 小さい | √ | √ | ||||
Huginn | 押す | メディ | √ | [10] | √ | √ | ||
その他 | 押す | 大 | [11] | [12] | [13] | √ | √ | √ |
詳細については、 FAQをご覧ください。
ノート
[5] | YahooはYahoo!を中止しました パイプは2015年には残っていますが、 |
[6] | Huginn 、 Flink 、 Spark 、 Storm |
[7] | これを分割モジュールで軽減することができます |
[8] | MySQL、Kafka、YARN、ZooKeeper、Mesosなどの外部サービスに依存しません |
[9] | 複雑なイベント処理 |
[10] | Huginnは非同期Webリクエストをしていないようです |
[11] | 多くのライブラリは、サードパーティのライブラリを使用せずにRSSストリームを解析することはできません |
[12] | ほとんどの図書館はローカルモードを提供していますが、多くの図書館では有用なことを行うためにデータ収集者(Flume / Kafkaなど)との統合が必要です |
[13] | 私は、これらのライブラリが非同期APIを提供しているという証拠を見つけることはできません(そして明らかにSparkはそうではありません ) |
使用法
riko
は、Pythonライブラリとして直接使用することを意図しています。
使用インデックス
フィードの取得
riko
は “ソース” pipes
経由でローカルとリモート両方のファイルパスからrssフィードを取得できます。 各「ソース」 pipe
は、 stream
、すなわち辞書のイテレータ、別名items
返す。
>>> from riko.modules import fetch, fetchsitefeed
>>>
>>> ### Fetch an RSS feed ###
>>> stream = fetch.pipe(conf={'url': 'https://news.ycombinator.com/rss'})
>>>
>>> ### Fetch the first RSS feed found ###
>>> stream = fetchsitefeed.pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'})
>>>
>>> ### View the fetched RSS feed(s) ###
>>> #
>>> # Note: regardless of how you fetch an RSS feed, it will have the same
>>> # structure
>>> item = next(stream)
>>> item.keys()
dict_keys(['title_detail', 'author.uri', 'tags', 'summary_detail', 'author_detail',
'author.name', 'y:published', 'y:title', 'content', 'title', 'pubDate',
'guidislink', 'id', 'summary', 'dc:creator', 'authors', 'published_parsed',
'links', 'y:id', 'author', 'link', 'published'])
>>> item['title'], item['author'], item['id']
('Gravity doesn’t care about quantum spin',
'Chris Lee',
'http://arstechnica.com/?p=924009')
サポートされているファイルの種類とプロトコルの完全な一覧については、 FAQを参照してください 。 他の例については、 データとフィードのフェッチを参照してください。
同期処理
riko
は40本のビルトイン pipes
streams
を変更できstreams
>>> from riko.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
>>> filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
>>> xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
>>> xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
>>>
>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> #
>>> # The following flow will:
>>> # 1. fetch the hackernews RSS feed
>>> # 2. filter for items with '.com' in the link
>>> # 3. sort the items ascending by title
>>> # 4. fetch the first comment from each item
>>> # 5. flatten the result into one raw stream
>>> # 6. extract the first item's content
>>> #
>>> # Note: sorting is not lazy so take caution when using this pipe
>>>
>>> flow = (
... SyncPipe('fetch', conf=fetch_conf) # 1
... .filter(conf={'rule': filter_rule}) # 2
... .sort(conf={'rule': {'sort_key': 'title'}}) # 3
... .xpathfetchpage(conf=xpath_conf)) # 4
>>>
>>> stream = flow.output # 5
>>> next(stream)['content'] # 6
'Open Artificial Pancreas home:'
stream
を作成する代替の(関数ベースの)メソッドの代替ワークフロー作成を参照してください。 利用可能なpipes
完全な一覧については、 パイプを参照してください。
並列処理
riko
の並列APIを使用してThreadPool
[14]を生成する例は、
>>> from riko.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
>>> filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
>>> xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
>>> xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
>>>
>>> ### Create a parallel SyncPipe flow ###
>>> #
>>> # The following flow will:
>>> # 1. fetch the hackernews RSS feed
>>> # 2. filter for items with '.com' in the article link
>>> # 3. fetch the first comment from all items in parallel (using 4 workers)
>>> # 4. flatten the result into one raw stream
>>> # 5. extract the first item's content
>>> #
>>> # Note: no point in sorting after the filter since parallel fetching doesn't guarantee
>>> # order
>>> flow = (
... SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4) # 1
... .filter(conf={'rule': filter_rule}) # 2
... .xpathfetchpage(conf=xpath_conf)) # 3
>>>
>>> stream = flow.output # 4
>>> next(stream)['content'] # 5
'He uses the following example for when to throw your own errors:'
非同期処理
非同期処理を有効にするには、 async
モジュールをインストールする必要があります。
pip install riko[async]
riko
の非同期APIを使用した例
>>> from riko.bado import coroutine, react
>>> from riko.collections import AsyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
>>> filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
>>> xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
>>> xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
>>>
>>> ### Create an AsyncPipe flow ###
>>> #
>>> # The following flow will:
>>> # 1. fetch the hackernews RSS feed
>>> # 2. filter for items with '.com' in the article link
>>> # 3. asynchronously fetch the first comment from each item (using 4 connections)
>>> # 4. flatten the result into one raw stream
>>> # 5. extract the first item's content
>>> #
>>> # Note: no point in sorting after the filter since async fetching doesn't guarantee
>>> # order
>>> @coroutine
... def run(reactor):
... stream = yield (
... AsyncPipe('fetch', conf=fetch_conf, connections=4) # 1
... .filter(conf={'rule': filter_rule}) # 2
... .xpathfetchpage(conf=xpath_conf) # 3
... .output) # 4
...
... print(next(stream)['content']) # 5
>>>
>>> try:
... react(run)
... except SystemExit:
... pass
Here's how iteration works ():
クックブック
さらに詳しい例は、 cookbookやipythonノートブックを参照してください。
ノート
[14] | SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False) ように、 threads=False をSyncPipe に追加で渡すことで、 ProcessPool を有効にすることができます。 |
インストール
(あなたはvirtualenvを使用していますか?)
コマンドラインでpip
( 推奨 )を使用してriko
をインストールし、
pip install riko
またはeasy_install
easy_install riko
詳細については、 インストールドキュメントを参照してください。
デザイン原則
riko
の主要なデータ構造はitem
とstream
です。 item
は単なるPython辞書であり、 stream
はitems
イテレータです。 [{'content': 'hello world'}]
ような簡単な方法でstream
手動で作成することができます。 あなたはpipes
を介してriko
streams
を操作しstreams
。 pipe
は単にstream
またはitem
いずれかを受け取り、 stream
を返す関数stream
。 pipes
は構成可能です。あるpipe
出力を別のpipe
入力として使用できます。
riko
pipes
は2つの味で来る; operators
とprocessors
。 operators
は一度にstream
全体を操作し、個々のアイテムを処理することはできません。 operators
例には、 count
、 pipefilter
、およびreverse
ます。
>>> from riko.modules.reverse import pipe
>>>
>>> stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(stream))
{'title': 'riko pt. 2'}
processors
個々のitems
を処理し、スレッドまたはプロセス間で並列化することができます。 processors
例には、 fetchsitefeed
、 hash
、 pipeitembuilder
、およびpiperegex
ます。
>>> from riko.modules.hash import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> stream = pipe(item, field='title')
>>> next(stream)
{'title': 'riko pt. 1', 'hash': 2853617420}
いくつかのprocessors
、例えばpipetokenizer
、複数の結果を返します。
>>> from riko.modules.tokenizer import pipe
>>>
>>> item = {'title': 'riko pt. 1'}
>>> tokenizer_conf = {'delimiter': ' '}
>>> stream = pipe(item, conf=tokenizer_conf, field='title')
>>> next(stream)
{'tokenizer': [{'content': 'riko'},
{'content': 'pt.'},
{'content': '1'}],
'title': 'riko pt. 1'}
>>> # In this case, if we just want the result, we can `emit` it instead
>>> stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)
>>> next(stream)
{'content': 'riko'}
operators
はaggregators
とcomposers
サブタイプに分割されます。 aggregators
( count
)は、入力stream
すべてのitems
を単一のitem
新しいstream
に結合しitem
。 例えば、 filter
などのcomposers
は、入力stream
いくつかまたはすべてのitems
を含む新しいstream
を作成する。
>>> from riko.modules.count import pipe
>>>
>>> stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]
>>> next(pipe(stream))
{'count': 2}
“Word Count”の例を上から混同している場合は、 count_key
設定オプションを渡すと、 count
は複数の項目を返すことができます。
>>> counted = pipe(stream, conf={'count_key': 'title'})
>>> next(counted)
{'riko pt. 1': 1}
>>> next(counted)
{'riko pt. 2': 1}
processors
は、 source
とtransformer
サブタイプに分割されています。 例えば、 itembuilder
sources
、 transformers
(例えば、 hash
はstream
内のアイテムのみを変換できる)を生成することができる。
>>> from riko.modules.itembuilder import pipe
>>>
>>> attrs = {'key': 'title', 'value': 'riko pt. 1'}
>>> next(pipe(conf={'attrs': attrs}))
{'title': 'riko pt. 1'}
次の表は、これらの観察結果をまとめたものです。
タイプ | サブタイプ | 入力 | 出力 | 並列化可能? | ストリームを作成しますか? |
オペレーター | アグリゲータ | ストリーム | ストリーム[15] | ||
作曲家 | ストリーム | ストリーム | |||
プロセッサー | ソース | 項目 | ストリーム | √ | √ |
トランス | 項目 | ストリーム | √ |
pipe
の種類が不明な場合は、そのメタデータを確認してください。
>>> from riko.modules import fetchpage, count
>>>
>>> fetchpage.async_pipe.__dict__
{'type': 'processor', 'name': 'fetchpage', 'sub_type': 'source'}
>>> count.pipe.__dict__
{'type': 'operator', 'name': 'count', 'sub_type': 'aggregator'}
AsyncPipe
クラスとAsyncPipe
クラス(特に)は、便利なメソッド連鎖と透過的な並列化を可能にするために、このチェックを実行します。
>>> from riko.collections import SyncPipe
>>>
>>> attrs = [
... {'key': 'title', 'value': 'riko pt. 1'},
... {'key': 'content', 'value': "Let's talk about riko!"}]
>>> flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()
>>> flow.list[0]
{'title': 'riko pt. 1',
'content': "Let's talk about riko!",
'hash': 1346301218}
他のパイプから値をワイヤリングする方法やユーザーの入力を受け入れる方法など、高度な例については、 クックブックを参照してください。
ノート
[15] | aggregator 出力stream は、1つのitem のみのイテレータです。 |
コマンドラインインターフェイス
runpipe
、 workflows
を実行するコマンドrunpipe
提供しworkflows
。 workflow
とは、単にpipe
という名前の関数を含むファイルで、 flow
を作成し、結果のstream
を処理しstream
。
CLIの使用法
使い方:runpipe [pipeid]
説明:rikoパイプを実行します。
- 位置の引数:
- pipeid実行するパイプ(デフォルト:stdinから読み込みます)。
- オプション引数:
-h、--help このヘルプメッセージを表示して終了する -a、--async 非同期パイプをロードします。 -t、 - テスト テストモードで実行します(デフォルト入力を使用します)。
CLIセットアップ
flow.py
from __future__ import print_function
from riko.collections import SyncPipe
conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}
conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}
def pipe(test=False):
kwargs = {'conf': conf1, 'test': test}
flow = SyncPipe('itembuilder', **kwargs).strreplace(conf=conf2)
stream = flow.output
for i in stream:
print(i)
CLIの例
次に、 flow.py
を実行するには、 runpipe flow
コマンドをrunpipe flow
。 端末に次の出力が表示されます。
https://google.co.uk
runpipe
はworkflows
のexamples
ディレクトリも検索しworkflows
。 runpipe demo
と入力すると、次の出力が表示されます。
Deadline to clear up health law eligibility near 682
スクリプト
riko
は組み込みのタスクマネージャーが付属しています。
セットアップ
pip install riko[develop]
例
python lintとnoseテストを実行する
manage lint
manage test
貢献する
このレポで使用されているコーディングスタイル/コンベンションを模倣してください。 新しいクラスや関数を追加する場合は、適切なdocブロックを例文とともに追加してください。 また、python lintとnoseテストが合格していることを確認してください。
詳細は寄稿文をご覧ください。
クレジット
激しく鼓動するリコのためにpipe2pyに挑戦してください 。 pipe2py
はpipe2py
フォークとして始めましたが、それ以来ずっと元のコードベースのほとんど(もしあれば)が残っています。
詳細情報
プロジェクトの構造
┌── benchmarks
│ ├── __init__.py
│ └── parallel.py
├── bin
│ └── run
├── data/*
├── docs
│ ├── AUTHORS.rst
│ ├── CHANGES.rst
│ ├── COOKBOOK.rst
│ ├── FAQ.rst
│ ├── INSTALLATION.rst
│ └── TODO.rst
├── examples/*
├── helpers/*
├── riko
│ ├── __init__.py
│ ├── lib
│ │ ├── __init__.py
│ │ ├── autorss.py
│ │ ├── collections.py
│ │ ├── dotdict.py
│ │ ├── log.py
│ │ ├── tags.py
│ │ └── py
│ ├── modules/*
│ └── twisted
│ ├── __init__.py
│ ├── collections.py
│ └── py
├── tests
│ ├── __init__.py
│ ├── standard.rc
│ └── test_examples.py
├── CONTRIBUTING.rst
├── dev-requirements.txt
├── LICENSE
├── Makefile
├── manage.py
├── MANIFEST.in
├── optional-requirements.txt
├── py2-requirements.txt
├── README.rst
├── requirements.txt
├── setup.cfg
├── setup.py
└── tox.ini