GitHubじゃ!Pythonじゃ!

GitHubからPython関係の優良リポジトリを探したかったのじゃー、でも英語は出来ないから日本語で読むのじゃー、英語社会世知辛いのじゃー

robinhood

faust – Pythonストリーム処理

投稿日:

Pythonストリーム処理

Pythonストリーム処理

バージョン: 1.0.27
ウェブ: http://faust.readthedocs.io/
ダウンロード: http://pypi.python.org/pypi/faust
ソース: http://github.com/robinhood/faust
キーワード: 分散、ストリーム、非同期、処理、データ、キュー
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust

Faustはストリーム処理ライブラリで、 KafkaストリームからPythonにアイデアを移植します。

Robinhoodでは 、毎日何十億というイベントを処理する高性能分散システムとリアルタイムデータパイプラインを構築するために使用されています。

ファウストはストリーム処理イベント処理の両方を提供し、 Kafka StreamsApache Spark / Storm / Samza / Flink

それはDSLを使用しません、ちょうどPythonです! つまり、ストリーム処理時に好きなPythonライブラリ(NumPy、PyTorch、Pandas、NLTK、Django、Flask、SQLAlchemy、++)をすべて使用できます。

Faustには、新しいasync / await構文と変数型の注釈のためにPython 3.6以降が必要です。

着信オーダーのストリームを処理する例を次に示します。

app = faust.App('myapp', broker='kafka://localhost')

# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order(faust.Record):
    account_id: str
    amount: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        # process infinite stream of orders.
        print(f'Order for {order.account_id}: {order.amount}')

エージェントデコレータは、基本的にカフカのトピックから消費され、受信するすべてのイベントに対して何かを行う「ストリームプロセッサ」を定義します。

エージェントはasync def関数なので、Web要求などの他の操作を非同期的に実行することもできます。

このシステムは、データベースのように動作し、状態を維持することができます。 テーブルは、通常のPython辞書として使用できる分散キー/バリューストアという名前です。

テーブルは、 RocksDBと呼ばれるC ++で書かれた超高速埋め込みデータベースを使用して、各マシンにローカルに格納されます。

テーブルには、「最終日からのクリック数」または「過去1時間のクリック数」を追跡できるように、オプションで「ウィンドウ」された集計数を格納することもできます。 例えば。 Kafka Streamsのように 、私たちは転倒、ホッピング、時間のスライディングウィンドウをサポートし、古いウィンドウはデータがいっぱいになるのを防ぐために期限切れになる可能性があります。

信頼性のために、カフカのトピックを「write-ahead-log」として使用します。 鍵が変更されるたびに、私たちは変更ログに公開します。 スタンバイ・ノードはこの変更ログを使用してデータの正確なレプリカを保持し、ノードのいずれかが故障した場合に即時回復を可能にします。

ユーザーにとっては、テーブルは単なる辞書ですが、データは再起動とノード間で複製されるため、フェールオーバー時に他のノードが自動的に引き継ぐことができます。

ページビューはURLでカウントできます:

# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic('clicks', key_type=str, value_type=int)

# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)

@app.agent(click_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

カフカのトピックに送信されたデータは分割されています。つまり、同じURLのすべてのカウントが同じFaustワーカーインスタンスに配信されるように、クリック数がURLによって分割されます。

ファウストは、バイト、Unicode、シリアライズされた構造の任意のタイプのストリームデータをサポートしますが、最新のPython構文を使用してストリームのキーと値をどのようにシリアライズするかを記述する “Models”

# Order is a json serialized dictionary,
# having these fields:

class Order(faust.Record):
    account_id: str
    product_id: str
    price: float
    quantity: float = 1.0

orders_topic = app.topic('orders', key_type=str, value_type=Order)

@app.agent(orders_topic)
async def process_order(orders):
    async for order in orders:
        # process each order using regular Python
        total_price = order.price * order.quantity
        await send_order_received_email(order.account_id, order)

Faustは、 mypy型チェッカーを使用して静的に型指定されているため、アプリケーションの作成時に静的型を利用できます。

ファウストのソースコードは小さく、整然としたもので、 カフカストリームの実装を学ぶのに適したリソースです。

はじめに 紹介ページで ファウストについて学んで ください
ファウスト、システム要件、インストール手順、コミュニティリソースなどの詳細を読む。
クイックスタート チュートリアルに 直接進む
ストリーミングアプリケーションをプログラミングすることにより、ファウストを実際に見てください。
ユーザーガイドを参照してください
トピックで編成された詳細な情報が必要です。

ファウストは…

シンプル

ファウストは非常に使いやすいです。 他のストリーム処理ソリューションを使い始めるには、複雑な世界のプロジェクトやインフラストラクチャの要件があります。 FaustはKafkaだけを必要とし、残りは単なるPythonなので、Pythonを知っていれば、すでにFaustを使ってストリーム処理を行うことができます。

あなたが作ることができるより簡単なアプリケーションの1つはここにあります:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

あなたはおそらく、非同期でキーワードを待っているかもしれませんが、 asyncio Faustを使用する方法を知る必要はありません。例を模倣するだけで、うまくいくでしょう。

サンプルアプリケーションは、2つのタスクを開始します.1つはストリームを処理し、もう1つはバックグラウンドスレッドで、そのストリームにイベントを送信します。 現実のアプリケーションでは、プロセッサが消費するカフカのトピックにイベントを公開し、バックグラウンドスレッドはデータをこの例に供給するために必要です。

高可用性
ファウストは高可用性であり、ネットワークの問題やサーバクラッシュに耐えることができます。 ノードに障害が発生した場合、自動的に回復し、表には引き継ぐスタンバイ・ノードがあります。
分散
必要に応じて、アプリケーションのインスタンスをさらに開始します。
速い
シングルコアのFaustワーカーインスタンスは、毎秒数万のイベントを処理することができます。カフカクライアントの最適化をサポートできるようになれば、スループットが向上すると確信しています。
フレキシブル
ファウストは単なるPythonであり、ストリームは無限の非同期イテレータです。 Pythonの使い方を知っているなら、すでにFaustの使い方を知っていて、Django、Flask、SQLAlchemy、NTLK、NumPy、Scikit、TensorFlowなどのPythonライブラリと一緒に使えます。

インストール

Faustは、Python Package Index(PyPI)またはソースからインストールできます。

pipを使用してインストールするには:

$ pip install -U faust

バンドル

ファウストは、ファウストとそのフィーチャーの依存関係をインストールするために使用できるsetuptools拡張のグループも定義しています。

これらは、ブラケットを使用して要件やpipコマンドラインで指定できます。 複数のバンドルをコンマで区切ります:

$ pip install "faust[rocksdb]"

$ pip install "faust[rocksdb,uvloop,fast]"

以下のバンドルを利用できます。

店舗
faust[rocksdb]

Faustテーブルの状態を格納するためにRocksDBを使用する場合

プロダクションでお勧めします。

最適化
faust[fast] 利用可能なすべてのCスピードアップ拡張機能をFaustコアにインストールします。

センサ
faust[statsd] Statsd Faustモニターを使用しています。

イベントループ
faust[uvloop] Faustとuvloopを使うためのものuvloop
faust[gevent] Faustとgeventを使用するgeventです。
faust[eventlet] eventletファウストを使用するeventlet

デバッグ
faust[debug] aiomonitorを使用して、実行中のFaustワーカーを接続およびデバッグします。
faust[setproctitle] setproctitleモジュールがインストールされると、Faustワーカーはこれを使用してps / topリスティングでより良いプロセス名を設定します。 また、 fastおよびdebugバンドルとともにインストールされます。

ソースからのダウンロードとインストール

http://pypi.python.org/pypi/faustからFaustの最新バージョンをダウンロードする

以下のようにしてインストールすることができます:

$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install

現在virtualenvを使用していない場合、最後のコマンドは特権ユーザーとして実行する必要があります。

開発版の使用

ピップで

次のpipコマンドを使用して、Faustの最新のスナップショットをインストールできます。

$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust

よくある質問

Django / Flask / etcでFaustを使用できますか?

はい! geventと統合するには、 geventまたはeventletをブリッジとして使用しasyncio

gevent使用

この方法は、 gevent動作するPythonライブラリをブロックする場合にgeventです。

geventを使用するには、 aiogeventモジュールをインストールする必要があります。これは、Faustをバンドルとしてインストールできます。

$ pip install -U faust[gevent]

そして実際にイベントループとしてgeventを使用するには、 faustプログラムに-L <faust --loop>オプションを使用するか、

$ faust -L gevent -A myproj worker -l info

あなたのエントリーポイントスクリプトの一番上にimport mode.loop.geventを追加してください:

#!/usr/bin/env python3
import mode.loop.gevent

覚えておいてください:これはモジュールの最上部にあり、ライブラリをインポートする前に実行することが非常に重要です。

eventlet使用

この方法は、 eventlet動作するPythonライブラリをブロックする場合にeventletです。

eventletを使用するには、 aioeventletモジュールをインストールする必要があります。これをFaustと一緒にバンドルとしてインストールすることができます:

$ pip install -U faust[eventlet]

イベントループとしてイベントレットを実際に使用するには、 faustプログラムに-L <faust --loop>引数を使用する必要があります。

$ faust -L eventlet -A myproj worker -l info

あなたのエントリーポイントスクリプトの一番上にあるimport mode.loop.eventletを追加してください:

#!/usr/bin/env python3
import mode.loop.eventlet  # noqa

警告

これはモジュールの最上部にあり、ライブラリをインポートする前に実行することが非常に重要です。

FaustとTornadoを併用できますか?

はい! tornado.platform.asyncioブリッジを使用してくださいhttp : //www.tornadoweb.org/en/stable/asyncio.html

Faust with Twistedを使うことはできますか?

はい! asyncioリアクターの実装を使用してくださいhttps : asyncio

Python 3.5以前をサポートしますか?

すぐにPython 3.5をサポートする計画はありませんが、プロジェクトに貢献することは大歓迎です。

これを達成するために必要なステップは次のとおりです。

  • 変数注釈をコメントに書き換えるためのソースコード変換

    例えば、コード:

         class Point:
             x: int = 0
             y: int = 0
    
    must be rewritten into::
    
         class Point:
             x = 0  # type: int
             y = 0  # type: int
    
  • 非同期関数を書き換えるためのソースコード変換

    例えば、コード:

    async def foo():
        await asyncio.sleep(1.0)
    

    次のように書き換えなければなりません:

    @coroutine
    def foo():
        yield from asyncio.sleep(1.0)
    

Python 2をサポートしますか?

Python 2をサポートする予定はありませんが、プロジェクトに参加することは大歓迎です(上記の質問の詳細はPython 2でも関係します)。

Faustアプリケーションをローカルで実行しているときに、RocksDBが開いているファイルの最大数がエラーを超えました。 これをどうすれば解決できますか?

開いているファイルの最大数の制限を増やす必要があるかもしれません。 次の記事では、OS Xでの操作方法について説明しています。https : //blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/

ヘルプの利用

メーリングリスト

Faustの利用、開発、未来についての議論についてはfaust-usersメーリングリストに参加してください。

スラック

スラックで私たちとチャットしてください:

https://fauststream.slack.com

リソース

バグトラッカー

バグ報告や迷惑行為がある場合は、 https://github.com/robinhood/faust/issues/の問題追跡ツールに報告してください。

ウィキ

https://wiki.github.com/robinhood/faust/

ライセンス

このソフトウェアは、New BSDライセンスの下で使用許諾されます。 完全なライセンステキストについては、一番上のディストリビューションディレクトリにあるLICENSEファイルを参照してください。

貢献する

Faustの開発はGitHubで行われます: https : //github.com/robinhood/faust

ファウストの開発に参加することを強くお勧めします。

また、ドキュメンテーションの「 Faustへ貢献」セクションも必ずお読みください。

行動規範

プロジェクトのコードベース、課題トラッカー、チャットルーム、およびメーリングリストで相互作用するすべての人は、ファウスト行動規範に従うことが求められます。

これらのプロジェクトの貢献者および維持者として、オープンで歓迎するコミュニティを育むことを目的として、報告の問題、機能要求の掲示、文書の更新、プルリクエストまたはパッチの提出、およびその他の活動を通じて貢献するすべての人々を尊重することを約束します。

ジェンダー、性的指向、性的指向、身体的外見、身体の大きさ、人種、民族、年齢、宗教、その他のレベルにかかわらず、これらのプロジェクトに参加することを約束します。国籍。

参加者による受け入れがたい行動の例としては、

  • 性的な言語やイメージの使用
  • 個人的な攻撃
  • トローリングまたは侮辱/軽蔑的なコメント
  • 公的または私的なハラスメント
  • 明示的な許可なく、物理的または電子的アドレスなどの他の個人情報を公開する
  • その他の非倫理的またはプロフェッショナルな行為。

プロジェクト管理者は、この行動規範に則っていないコメント、コミット、コード、Wiki編集、問題、およびその他の貢献を削除、編集、または却下する権利と責任を負います。 この行動規範を採択することにより、プロジェクト保守者は、このプロジェクトを管理するすべての側面にこれらの原則を公正かつ一貫して適用することを約束します。 行動規範を遵守していない、または実施していないプロジェクトメンテナーは、プロジェクトチームから永久に削除される可能性があります。

この行動規範は、個人がプロジェクトまたはそのコミュニティを代表している場合に、プロジェクトスペースとパブリックスペースの両方に適用されます。

嫌がらせ行為、嫌がらせ行為、または容認できない行為の例は、問題を開いたり、1人または複数のプロジェクトメンテナーに連絡したりして報告することができます。

この行動規範は、 http: //contributor-covenant.org/version/1/2/0/にあるContributor Covenantバージョン1.2.0に適合しています







-robinhood
-, , , , , ,

執筆者:

robinhood

faust – Pythonストリーム処理

投稿日:

(さらに…)







-robinhood
-, , , , , ,

執筆者: