GitHubじゃ!Pythonじゃ!

GitHubからPython関係の優良リポジトリを探したかったのじゃー、でも英語は出来ないから日本語で読むのじゃー、英語社会世知辛いのじゃー

celery

kombu – Python用メッセージングライブラリ

投稿日:

Python用メッセージングライブラリ http://kombu.readthedocs.org/

kombu – Python用メッセージングライブラリ

バージョン: 4.1.0
ウェブ: http://kombu.me/
ダウンロード: https://pypi.python.org/pypi/kombu/
ソース: https://github.com/celery/kombu/
キーワード: メッセージング、amqp、rabbitmq、redis、mongodb、python、キュー

KombuはPython用のメッセージングライブラリです。

Kombuの目的は、AMQプロトコルのための慣用的な高水準インターフェースを提供することにより、Pythonでのメッセージングを可能な限り簡単にすることです。また、一般的なメッセージングの問題に対して実証済みでテスト済みのソリューションを提供します。

AMQPは、メッセージの方向付け、キューイング、ルーティング、信頼性、セキュリティのためのオープンスタンダードプロトコルであるRabbitMQメッセージングサーバーが最も普及している実装である高度メッセージキュープロトコルです。

特徴

  • アプリケーション作成者がプラグイン可能なトランスポートを使用して複数のメッセージサーバーソリューションをサポートできるようにします。

    • py-amqplibrabbitmq 、またはqpid-pythonライブラリを使用したAMQPトランスポート。

    • C言語で書かれた高性能AMQPトランスポート – librabbitmqを使用する場合

      librabbitmqがインストールされていると自動的に有効になります:

      $ pip install librabbitmq
      
    • バーチャルトランスポートにより、非AMQPトランスポートのサポートを簡単に追加できます。 RedisAmazon SQSZooKeeperSoftLayer MQPyroのサポートは既に組み込まれています。

    • ユニットテストのためのメモリ内転送。

  • メッセージペイロードの自動エンコーディング、シリアライゼーション、および圧縮をサポートします。

  • トランスポート間の一貫した例外処理。

  • 接続とチャネルのエラーを適切に処理することによって操作が実行されることを保証する機能。

  • タイムアウトのサポートや、複数のチャンネルのイベントを待つ機能など、 amqplibのいくつかの不具合が修正されました。

  • すでにニンジンを使用しているプロジェクトは互換性レイヤーを使用して簡単に移植できます。

AMQPの紹介については、 Rabbits and warrensの記事 AMQPに関するWikipediaの記事を読んでください。

輸送の比較

クライアント タイプ 直接 トピック 扇形に広がります 優先 TTL
amqp ネイティブ はい はい はい はい[3] はい[4]
qpid ネイティブ はい はい はい いいえ いいえ
赤目 バーチャル はい はい はい(PUB / SUB) はい いいえ
モンゴブ バーチャル はい はい はい はい はい
SQS バーチャル はい はい[1] はい[2] いいえ いいえ
飼い猫 バーチャル はい はい[1] いいえ はい いいえ
インメモリ バーチャル はい はい[1] いいえ いいえ いいえ
SLMQ バーチャル はい はい[1] いいえ いいえ いいえ
[1] 1、2、3、4宣言はメモリ内にのみ保持されるため、交換/キューは、宣言を必要とするすべてのクライアントによって宣言されなければなりません。
[2] Fanoutは、SimpleDBのルーティングテーブルを格納することでサポートされています。 デフォルトでは無効ですが、 supports_fanoutトランスポートオプションを使用しsupports_fanout有効にすることができます。
[3] AMQPメッセージ優先順位のサポートは、ブローカーの実装に依存します。
[4] AMQPメッセージ/キューTTLのサポートはブローカの実装に依存します。

ドキュメンテーション

KombuはSphinxを使用しています。最新のドキュメントはここにあります:

https://kombu.readthedocs.io/

クイック概要

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()

または手動でチャンネルを処理する:

with connection.channel() as channel:
    producer = Producer(channel, ...)
    consumer = Producer(channel)

すべてのオブジェクトはwith文の外側でも使用できます。使用後にオブジェクトを閉じることを忘れないでください。

from kombu import Connection, Consumer, Producer

connection = Connection()
    # ...
connection.release()

consumer = Consumer(channel_or_connection, ...)
consumer.register_callback(my_callback)
consumer.consume()
    # ....
consumer.cancel()

ExchangeとQueueは、設定ファイルなどでpickleして使用できる単純な宣言です。

また、オペレーションもサポートしていますが、そのためにはチャネルにバインドする必要があります。

交換とキューを接続にバインドすると、その接続にデフォルトチャネルが使用されます。

>>> exchange = Exchange('tasks', 'direct')

>>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()

# the original exchange is not affected, and stays unbound.
>>> exchange.delete()
raise NotBoundError: Can't call delete on Exchange not bound to
    a channel.

用語

開始する前に知っておくべきいくつかの概念があります:

  • プロデューサー

    プロデューサはメッセージを交換所に送信します。

  • 交流

    メッセージは取引所に送られます。 交換は名前が付けられ、いくつかのルーティングアルゴリズムの1つを使用するように構成することができます。 交換機は、メッセージ内のルーティングキーを、交換機にバインディングするときに消費者が提供するルーティングキーと照合することによって、メッセージをコンシューマにルーティングする。

  • 消費者

    消費者はキューを宣言し、それをエクスチェンジにバインドし、キューからメッセージを受信します。

  • キュー

    キューは、交換機に送信されたメッセージを受信します。 キューはコンシューマによって宣言されます。

  • ルーティングキー

    すべてのメッセージにルーティングキーがあります。 ルーティングキーの解釈は、交換タイプによって異なります。 AMQP標準で定義されている4つのデフォルトの交換タイプがあり、ベンダーはカスタムタイプを定義できます(詳細についてはベンダーのマニュアルを参照してください)。

    これらは、AMQP / 0.8で定義されているデフォルトの交換タイプです。

    • 直接交換

      メッセージのルーティングキープロパティとコンシューマのrouting_key属性が同一である場合に一致します。

    • ファンアウト交換

      バインディングにルーティングキーがない場合でも常に一致します。

    • トピック交換

      プリミティブパターンマッチングスキームによってメッセージのルーティングキープロパティに一致します。 メッセージルーティングキーはドットで区切られた単語(ドメイン名のような “。”)で構成され、2つの特殊文字が使用できます。 star( “*”)とハッシュ( “#”)で構成されています。 星は任意の単語に一致し、ハッシュは0個以上の単語に一致します。 たとえば、 “* .stock。#”は “usd.stock”と “eur.stock.db”というルーティングキーに一致しますが、 “stock.asdaq”には一致しません。

インストール

Kombuは、Python Package Index(PyPI)またはソースからインストールできます。

pipを使ってインストールするには:

$ pip install kombu

easy_installを使ってインストールするには:

$ easy_install kombu

ソースtarballをダウンロードした場合は、以下を実行してインストールできます:

$ python setup.py build
# python setup.py install # as root

ヘルプの利用

メーリングリスト

carrot-usersメーリングリストに参加してください。

バグトラッカー

バグ報告や迷惑行為がある場合は、 https://github.com/celery/kombu/issues/の問題追跡ツールに報告してください。

貢献する

Kombuの開発はGithubで行われますhttps : //github.com/celery/kombu

開発に参加することを強くお勧めします。 Githubが好きではない場合(何らかの理由で)定期的にパッチを送ってください。

ライセンス

このソフトウェアは、New BSDライセンスの下で使用許諾されます。 完全なライセンステキストについては、一番上のディストリビューションディレクトリにあるLICENSEファイルを参照してください。







-celery
-, , , , , , , , ,

執筆者: