GitHubじゃ!Pythonじゃ!

GitHubからPython関係の優良リポジトリを探したかったのじゃー、でも英語は出来ないから日本語で読むのじゃー、英語社会世知辛いのじゃー

oliver006

elasticsearch-gmail – ElasticsearchでGmailの受信トレイをインデックス登録する

投稿日:

ElasticsearchでGmailの受信トレイをインデックス登録する

初心者向けのElasticsearch:Gmail受信トレイのインデックス登録

これについては何ですか?

私は最近、自分のGmailの受信トレイを見て、私が50kを超えるメールを見て、約12GBのスペースを取っていることに気づいたが、スペースを取ったメール、送信した人、メールを送った人などを知らせる良い方法はない

このチュートリアルの目標は、バルクインデックスを使用してGmailの受信トレイ全体をElasticsearchにロードし、クラスタのクエリを開始して、何が起こっているのかをより正確に把握することです。

関連チュートリアル: ElasticsearchとHN APIを使用したHackerニュースのインデックス作成と検索

前提条件

Elasticsearchを設定し、 http:// localhost:9200で実行されていることを確認してください

スクリプトのためにPythonとTornadoを使用して、データをインポートして照会します。 また、HTML / JS / CSSを取り除くためのbeautifulsoup4 (body indexingフラグを使用したい場合)

以下を実行して依存関係をインストールします。

pip install -r requirements.txt

Aight、どこから始めますか?

まず、蓄積されたメールの量に応じて、 ここに移動してGmailメールボックスをダウンロードしてください。

ダウンロードしたアーカイブはmbox形式であり、Pythonはmbox形式で作業するためのライブラリを提供しています。

全体的なプログラムは次のようになります:

mbox = mailbox.UnixMailbox(open('emails.mbox', 'rb'), email.message_from_file)

for msg in mbox:
    item = convert_msg_to_json(msg)
	upload_item_to_es(item)

print "Done!"

さて、詳細について教えてください

完全なPythonコードはここにあります: src / index_emails.py

mboxをJSONに変換する

まず、mbox形式のメッセージをJSONに変換してElasticsearchに挿入できるようにする必要があります。 ここでは、データの正規化とクリーンアップに非常に役立つサンプルコードをいくつか紹介します。

良い最初のステップ:

def convert_msg_to_json(msg):
    result = {'parts': []}
    for (k, v) in msg.items():
        result[k.lower()] = v.decode('utf-8', 'ignore')

また、 FromTo電子メールアドレスを解析して正規化する必要があります。

for k in ['to', 'cc', 'bcc']:
    if not result.get(k):
        continue
    emails_split = result[k].replace('\n', '').replace('\t', '').replace('\r', '').replace(' ', '').encode('utf8').decode('utf-8', 'ignore').split(',')
    result[k] = [ normalize_email(e) for e in emails_split]

if "from" in result:
    result['from'] = normalize_email(result['from'])

Elasticsearchはタイムスタンプがマイクロ秒であることを期待していますので、それに応じて日付を変換しましょう

if "date" in result:
    tt = email.utils.parsedate_tz(result['date'])
    result['date_ts'] = int(calendar.timegm(tt) - tt[9]) * 1000

また、ラベルを分割して正規化する必要もあります

labels = []
if "x-gmail-labels" in result:
    labels = [l.strip().lower() for l in result["x-gmail-labels"].split(',')]
    del result["x-gmail-labels"]
result['labels'] = labels

メールのサイズも面白いので、それを打破しましょう

parts = json_msg.get("parts", [])
json_msg['content_size_total'] = 0
for part in parts:
    json_msg['content_size_total'] += len(part.get('content', ""))
Elasticsearchでデータをインデックス化する

最も単純なアプローチは、アイテムごとのPUTリクエストです。

def upload_item_to_es(item):
    es_url = "http://localhost:9200/gmail/email/%s" % (item['message-id'])
    request = HTTPRequest(es_url, method="PUT", body=json.dumps(item), request_timeout=10)
    response = yield http_client.fetch(request)
    if not response.code in [200, 201]:
        print "\nfailed to add item %s" % item['message-id']

しかし、Elasticsearchは大量のデータをインポートするためのより良い方法を提供します。 バルクインデックス作成ドキュメントごとにHTTPリクエストを作成し、個別にインデックスを作成する代わりに、たとえば、 1000の文書を作成し、それらを索引付けします。
バルクメッセージの形式は次のとおりです。

cmd\n
doc\n
cmd\n
doc\n
...

cmdは、索引付けする各doc制御メッセージです。 この例では、 cmdは次のようになります。

cmd = {'index': {'_index': 'gmail', '_type': 'email', '_id': item['message-id']}}`

最終的なコードは次のようになります。

upload_data = list()
for msg in mbox:
    item = convert_msg_to_json(msg)
    upload_data.append(item)
    if len(upload_data) == 100:
        upload_batch(upload_data)
        upload_data = list()

if upload_data:
    upload_batch(upload_data)

そして

def upload_batch(upload_data):

    upload_data_txt = ""
    for item in upload_data:
        cmd = {'index': {'_index': 'gmail', '_type': 'email', '_id': item['message-id']}}
        upload_data_txt += json.dumps(cmd) + "\n"
        upload_data_txt += json.dumps(item) + "\n"

    request = HTTPRequest("http://localhost:9200/_bulk", method="POST", body=upload_data_txt, request_timeout=240)
    response = http_client.fetch(request)
    result = json.loads(response.body)
	if 'errors' in result:
	    print result['errors']

さて、私にいくつかのデータを見せてください!

すべてのメールをインデックス登録した後、クエリの実行を開始できます。

フィルタ

過去6ヶ月間の電子メールを検索する場合は、範囲フィルタを使用して、現在の時間( now )から6か月を検索することができます。

curl -XGET 'http://localhost:9200/gmail/email/_search?pretty' -d '{
"filter": { "range" : { "date_ts" : { "gte": "now-6M" } } } }
'

2014年のすべてのメールをgteltを使用してフィルタリングすることができます

curl -XGET 'http://localhost:9200/gmail/email/_search?pretty' -d '{
"filter": { "range" : { "date_ts" : { "gte": "2013-01-01T00:00:00.000Z", "lt": "2014-01-01T00:00:00.000Z" } } } }
'

qパラメータを使用して特定のフィールドをすばやく照会することもできます。 この例では、すべてのAmazon発送情報メールが表示されます:

curl "localhost:9200/gmail/email/_search?pretty&q=from:ship-confirm@amazon.com"
集約クエリ

集約クエリでは、指定されたキーでデータをバケット化し、バケットあたりのメッセージ数をカウントできます。 たとえば、受信者によってグループ化されたメッセージの数:

curl -XGET 'http://localhost:9200/gmail/email/_search?pretty&search_type=count' -d '{
"aggs": { "emails": { "terms" : { "field" : "to",  "size": 10 }
} } }
'

結果:

"aggregations" : {
"emails" : {
  "buckets" : [ {
       "key" : "noreply@github.com",
       "doc_count" : 1920
  }, { "key" : "oliver@gmail.com",
       "doc_count" : 1326
  }, { "key" : "michael@gmail.com",
       "doc_count" : 263
  }, { "key" : "david@gmail.com",
       "doc_count" : 232
  }
  ...
  ]
}

これはラベルあたりの電子メール数を示します:

curl -XGET 'http://localhost:9200/gmail/email/_search?pretty&search_type=count' -d '{
"aggs": { "labels": { "terms" : { "field" : "labels",  "size": 10 }
} } }
'

結果:

"hits" : {
  "total" : 51794,
},
"aggregations" : {
"labels" : {
  "buckets" : [       {
       "key" : "important",
       "doc_count" : 15430
  }, { "key" : "github",
       "doc_count" : 4928
  }, { "key" : "sent",
       "doc_count" : 4285
  }, { "key" : "unread",
       "doc_count" : 510
  },
  ...
   ]
}

date histogramを使用して、1年に送受信した電子メールの数をカウントすることもできます。

curl -s "localhost:9200/gmail/email/_search?pretty&search_type=count" -d '
{ "aggs": {
    "years": {
      "date_histogram": {
        "field": "date_ts", "interval": "year"
}}}}
'

結果:

"aggregations" : {
"years" : {
  "buckets" : [ {
    "key_as_string" : "2004-01-01T00:00:00.000Z",
    "key" : 1072915200000,
    "doc_count" : 585
  }, {
...
  }, {
    "key_as_string" : "2013-01-01T00:00:00.000Z",
    "key" : 1356998400000,
    "doc_count" : 12832
  }, {
    "key_as_string" : "2014-01-01T00:00:00.000Z",
    "key" : 1388534400000,
    "doc_count" : 7283
  } ]
}

Amazon / Steamでどれくらい費やしたかを調べるための集約クエリを作成します。

GET _search
{
  "query": {
    "match_all": {}
      },
      "size": 0,
      "aggs": {
        "group_by_company": {
          "terms": {
            "field": "order_details.merchant"
            },
            "aggs": {
              "total_spent": {
                "sum": {
                  "field": "order_details.order_total"
                }
                },
                "postage": {
                  "sum": {
                    "field": "order_details.postage"
                  }
                }
              }
            }
          }
        }

Todo

  • より興味深いクエリ
  • スキーマ調整
  • マルチパートメッセージ解析
  • パフォーマンスについての宣伝

フィードバック

オープンプルリクエスト、問題、またはo@21zoo.comでのメール







-oliver006
-, , , , , , , ,

執筆者:

oliver006

elasticsearch-gmail – ElasticsearchでGmailの受信トレイをインデックス登録する

投稿日:

(さらに…)







-oliver006
-, , , , , , , ,

執筆者: