GitHubじゃ!Pythonじゃ!

GitHubからPython関係の優良リポジトリを探したかったのじゃー、でも英語は出来ないから日本語で読むのじゃー、英語社会世知辛いのじゃー

databricks

spark-deep-learning – Apache Sparkのディープ・ラーニング・パイプライン //databricks.github.io/spa..

投稿日:

Apache Sparkのディープ・ラーニング・パイプライン https: //databricks.github.io/spark-de

Apache Sparkのディープ・ラーニング・パイプライン

ディープ・ラーニング・パイプラインは、Apache Sparkを使ってPythonでスケーラブルなディープ・ラーニングのための高水準APIを提供します。

概要

ディープ・ラーニング・パイプラインは、Apache Sparkを使ってPythonでスケーラブルなディープ・ラーニングのための高水準APIを提供します。

このライブラリはDatabricksのものであり、Sparkを2つの最強面に利用しています。

  1. SparkとSpark MLlibの精神で、使いやすいAPIを提供し、数行のコードで深い学習が可能です。
  2. Sparkの強力な分散エンジンを使用して、膨大なデータセットの詳細な学習をスケールアウトします。

現在、TensorFlowとTensorFlowがサポートするKerasワークフローがサポートされており、

  • 大規模推論/スコアリング
  • 画像データの学習とハイパーパラメータ調整

さらに、データ科学者や機械学習の専門家が深い学習モデルをより多くのユーザーグループで使用できるSQL関数に変換するツールを提供します。 これは単一モデルの分散型トレーニングを実行しません。これは活発な研究分野であり、ここでは大部分の深い学習ユースケースに対して最も実用的なソリューションを提供することを目指しています。

ライブラリの概要については、Deep Learning Pipelinesを紹介するDatabricksのブログ記事を参照してください。 パッケージのさまざまな使用例については、以下のクイックユーザーガイドのセクションを参照してください。

図書館は初期の段階であり、皆様のご意見とご協力を歓迎します。

メンテナ:Bago Amirbekian、Joseph Bradley、Yogesh Garg、Sue Ann Hong、Tim Hunter、Siddharth Murching、Tomas Nykodym、Lu Wang

単体テストの作成と実行

このプロジェクトをコンパイルするには、プロジェクトのホームディレクトリからbuild/sbt assembly実行build/sbt assemblyます。 これにより、Scala単体テストも実行されます。

Python単体テストを実行するには、 python/ディレクトリから(コンパイル後に) run-tests.shスクリプトを実行します。 いくつかの環境変数を設定する必要があります。

# Be sure to run build/sbt assembly before running the Python tests
sparkdl$ SPARK_HOME=/usr/local/lib/spark-2.3.0-bin-hadoop2.7 PYSPARK_PYTHON=python3 SCALA_VERSION=2.11.8 SPARK_VERSION=2.3.0 ./python/run-tests.sh

スパークバージョンの互換性

最新のコードを使用するには、Spark 2.3.0が必要で、Python 3.6とScala 2.11をお勧めします。 定期的にテストされた組み合わせについては、 travis configを参照してください。

各リリースの互換性要件は、「 リリース」セクションに記載されています。

サポート

DL PipelineのGoogleグループに関する質問やディスカッションディスカッションに参加することができます。

Githubの問題にバグレポートや機能リクエストを投稿することもできます。

リリース

  • 1.0.0リリース:Spark 2.3.0が必要です。 Python 3.6とScala 2.11が推奨されています。 TensorFlow 1.6.0が必要です。
    1. Spark 2.3.0の画像の定義を使用する。 新しい定義では、変更前のこのプロジェクトで使用されているRGB順序の代わりに、3チャネルの画像のBGRチャネル順序を使用します。
    2. DeepImageFeaturizerの永続性(PythonとScalaの両方)。
  • 0.3.0リリース:Spark 2.2.0、Python 3.6、Scala 2.11が推奨されています。 TensorFlow 1.4.1が必要です。
    1. 非イメージ(テンソル)データに対する大規模なバッチ推論のためのKerasTransformerとTFTransformer
    2. 転送学習のためのScala API( DeepImageFeaturizer )。 InceptionV3がサポートされています。
    3. DeepImageFeaturizer&DeepImagePredictor(Python)にVGG16、VGG19モデルを追加しました。
  • 0.2.0リリース:Spark 2.1.1とPython 2.7が推奨されています。
    1. KerasImageFileEstimator API(画像ファイルのKerasモデルの学習)
    2. KerasモデルのSQL UDFサポート
    3. DeepImageFeaturizer&DeepImagePredictorにXception、Resnet50モデルを追加。
  • 0.1.0 Alpha release:Spark 2.1.1とPython 2.7が推奨されています。

ダウンロードとインストール

Deep Learning Pipelinesはスパークパッケージとして公開されています。 Spark Package」ページにアクセスして、リリースをダウンロードし、spark-shell、SBT、およびMavenで使用する方法を見つけます。

クイックユーザーガイド

ディープ・ラーニング・パイプラインは、ディープ・ラーニングを使用して画像を扱い、処理するための一連のツールを提供します。 ツールは次のように分類できます。

以下の例を実行するには、Deep Learning Pipelinesの最新リリースで動作するDeep Learning PipelinesのDatabricksドキュメントの Databricksノートブックを参照してください。 以前のリリースと互換性のあるいくつかのDatabricksノートブックがあります: 0.1.0,0.2.0,0.3.0,1.0.0

Sparkで画像を操作する

画像に深い学習を適用するための第一歩は、画像を読み込む能力です。 スパークおよびディープ・ラーニング・パイプラインには、数百万のイメージをSpark DataFrameにロードし、自動的に分散してデコードするユーティリティー・ファンクションが含まれており、規模の操作が可能です。

SparkのImageSchemaの使用

from pyspark.ml.image import ImageSchema
image_df = ImageSchema.readImages("/data/myimages")

カスタムイメージライブラリが必要な場合:

from sparkdl.image import imageIO as imageIO
image_df = imageIO.readImagesWithCustomFn("/data/myimages",decode_f=<your image library, see imageIO.PIL_decode>)

生成されたDataFrameには、スキーマ== ImageSchemaを持つイメージ構造体を含む “image”という名前の文字列が含まれています。

image_df.show()

なぜイメージ? ディープラーニングはイメージを含むタスクにとって強力であることが示されているため、Sparkのネイティブイメージを追加しました。 目標は、コミュニティーの関心に基づいてテキストや時系列などのより多くのデータ型をサポートすることです。

転送学習

Deep Learning Pipelinesは、深い学習を使用して開始する最速の(コードと実行時の)最速の方法の1つであるイメージの転送学習を実行するユーティリティを提供します。 ディープ・ラーニング・パイプラインを使用すると、数行のコードで実行できます。

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

model = p.fit(train_images_df)    # train_images_df is a dataset of images and labels

# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability",  "uri", "label")
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

DeepImageFeaturizerDeepImageFeaturizerから次のモデルをサポートしています:

  • 開始V3
  • Xception
  • ResNet50
  • VGG16
  • VGG19

分散型ハイパーパラメータチューニング

深い学習の中で最良の結果を得るには、訓練パラメータの異なる値を試す必要があります。これは、ハイパーパラメータチューニングと呼ばれる重要なステップです。 Deep Learning PipelinesはSparkの機械学習パイプラインの一歩として深い学習訓練を可能にするため、Spark MLlibに既に組み込まれている超パラメータのチューニングインフラストラクチャに頼ることができます。

Kerasユーザーの場合

KerasImageFileEstimatorモデルでKerasImageFileEstimatorパラメータチューニングを実行するには、 KerasImageFileEstimatorを使用して見積もりを作成し、ハイパーパラメータの調整(たとえばCrossValidator)にMLlibのツールを使用します。 KerasImageFileEstimator 、ケラスで頻繁に使用されるカスタムイメージの読み込みおよび処理機能を可能にするために、Image URIカラム(ImageSchemaカラムではありません)とともに動作します。

KerasImageFileEstimatorで推定量を構築するには、 KerasImageFileEstimatorモデルをファイルとして保存する必要があります。 モデルは、Keras組み込みモデルまたはユーザー訓練モデルにすることができます。

from keras.applications import InceptionV3

model = InceptionV3(weights="imagenet")
model.save('/tmp/model-full.h5')

URIから画像データを読み込んで前処理し、keras Model入力フォーマットで数値テンソルを返す画像ローディング関数を作成する必要もあります。 次に、保存されたモデルファイルを取り込むKerasImageFileEstimatorを作成することができます。

import PIL.Image
import numpy as np
from keras.applications.imagenet_utils import preprocess_input
from sparkdl.estimators.keras_image_file_estimator import KerasImageFileEstimator

def load_image_from_uri(local_uri):
  img = (PIL.Image.open(local_uri).convert('RGB').resize((299, 299), PIL.Image.ANTIALIAS))
  img_arr = np.array(img).astype(np.float32)
  img_tnsr = preprocess_input(img_arr[np.newaxis, :])
  return img_tnsr

estimator = KerasImageFileEstimator( inputCol="uri",
                                     outputCol="prediction",
                                     labelCol="one_hot_label",
                                     imageLoader=load_image_from_uri,
                                     kerasOptimizer='adam',
                                     kerasLoss='categorical_crossentropy',
                                     modelFile='/tmp/model-full-tmp.h5' # local file path for model
                                   ) 

CrossValidataorを使用してグリッド検索を行うことで、ハイパーパラメータチューニングに使用できます。

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = (
  ParamGridBuilder()
  .addGrid(estimator.kerasFitParams, [{"batch_size": 32, "verbose": 0},
                                      {"batch_size": 64, "verbose": 0}])
  .build()
)
bc = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label" )
cv = CrossValidator(estimator=estimator, estimatorParamMaps=paramGrid, evaluator=bc, numFolds=2)

cvModel = cv.fit(train_df)

深い学習モデルを大規模に適用する

Spark DataFramesは、大規模なデータセットに深い学習モデルを適用するための自然な構成です。 ディープ・ラーニング・パイプラインは、TensorFlowグラフとTensorFlowでサポートされるKerasモデルを大規模に適用するための一連のSpark MLlibトランスフォーマを提供します。 Transformersは、Tensorframesライブラリの支援を受けて、モデルやデータのSpark作業者への配布を効率的に処理します。

深い学習モデルを大規模に画像に適用する

ディープ・ラーニング・パイプラインは、スケールに沿ってモデルにイメージを適用するいくつかの方法を提供します。

  • 一般的な画像モデルは、TensorFlowまたはKerasコードを一切必要とせずに、すぐに使用できます。
  • 画像上で動作するTensorFlowグラフ
  • 画像で動作するKerasモデル
一般的な画像モデルを適用する

画像のためのよく知られた深い学習モデルがたくさんあります。 現在のタスクがモデルが提供するもの(例えばImageNetクラスによるオブジェクト認識)に非常に類似している場合、または純粋な探索の場合、モデル名を指定するだけでTransformer DeepImagePredictor使用できます。

from pyspark.ml.image import ImageSchema
from sparkdl import DeepImagePredictor

image_df = ImageSchema.readImages(sample_img_dir)

predictor = DeepImagePredictor(inputCol="image", outputCol="predicted_labels", modelName="InceptionV3", decodePredictions=True, topK=10)
predictions_df = predictor.transform(image_df)

DeepImagePredictorは、 DeepImagePredictorと同じモデルセットをDeepImagePredictorサポートしていDeepImageFeaturizer (上記を参照。)

TensorFlowユーザーの場合

Deep Learning Pipelinesは、画像の列(前のセクションで説明したユーティリティを使用してロードされたものなど)を含むDataFrameに、指定されたTensorFlowグラフを適用するMLlib Transformerを提供します。 TransformerでTensorFlow Graphを使用する方法の簡単な例を次に示します。 実際には、TensorFlowグラフはTFImageTransformer呼び出す前にファイルから復元される可能性があります。

from pyspark.ml.image import ImageSchema
from sparkdl import TFImageTransformer
import sparkdl.graph.utils as tfx  # strip_and_freeze_until was moved from sparkdl.transformers to sparkdl.graph.utils in 0.2.0
from sparkdl.transformers import utils
import tensorflow as tf

graph = tf.Graph()
with tf.Session(graph=graph) as sess:
    image_arr = utils.imageInputPlaceholder()
    resized_images = tf.image.resize_images(image_arr, (299, 299))
    # the following step is not necessary for this graph, but can be for graphs with variables, etc
    frozen_graph = tfx.strip_and_freeze_until([resized_images], graph, sess, return_graph=True)

transformer = TFImageTransformer(inputCol="image", outputCol="predictions", graph=frozen_graph,
                                 inputTensor=image_arr, outputTensor=resized_images,
                                 outputMode="image")

image_df = ImageSchema.readImages(sample_img_dir)
processed_image_df = transformer.transform(image_df)
Kerasユーザーの場合

KerasImageFileTransformer 、Sparkを使用して分散してKerasモデルを適用するために、 KerasImageFileTransformerに基づいたKerasモデルで動作します。 それ

  • ユーザ指定のイメージ読み込みと処理関数をイメージURIの列を含む入力DataFrameに適用することによって、イメージの列を含むDataFrameを内部的に作成します
  • 指定されたモデルファイルパスからKerasモデルをロードします。
  • モデルをイメージDataFrameに適用します。

上記のTFImageTransformerとのAPIの違いは、通常のKerasワークフローでは、TensorFlowグラフの一部ではない画像を読み込んでサイズを変更する非常に特殊な方法があることに起因しています。

トランスを使用するには、まずKerasモデルをファイルとして保存する必要があります。 Kerasに組み込まれたInceptionV3モデルを訓練するのではなく、保存することができます。

from keras.applications import InceptionV3

model = InceptionV3(weights="imagenet")
model.save('/tmp/model-full.h5')

今、予測面で:

from keras.applications.inception_v3 import preprocess_input
from keras.preprocessing.image import img_to_array, load_img
import numpy as np
import os
from pyspark.sql.types import StringType
from sparkdl import KerasImageFileTransformer

def loadAndPreprocessKerasInceptionV3(uri):
  # this is a typical way to load and prep images in keras
  image = img_to_array(load_img(uri, target_size=(299, 299)))  # image dimensions for InceptionV3
  image = np.expand_dims(image, axis=0)
  return preprocess_input(image)

transformer = KerasImageFileTransformer(inputCol="uri", outputCol="predictions",
                                        modelFile='/tmp/model-full-tmp.h5',  # local file path for model
                                        imageLoader=loadAndPreprocessKerasInceptionV3,
                                        outputMode="vector")

files = [os.path.abspath(os.path.join(dirpath, f)) for f in os.listdir("/data/myimages") if f.endswith('.jpg')]
uri_df = sqlContext.createDataFrame(files, StringType()).toDF("uri")

keras_pred_df = transformer.transform(uri_df)

深い学習モデルをスケールしてテンソルに適用する

ディープ・ラーニング・パイプラインは、一般的なディープ・ラーニング・ライブラリで書かれたテンソル入力(最大2次元)モデルを適用する方法も提供します。

  • TensorFlowグラフ
  • ケラスモデル
TensorFlowユーザーの場合

TFTransformerは、ユーザー指定のTensorFlowグラフを2次元までのテンソル入力に適用します。 TensorFlowグラフは、TensorFlowグラフオブジェクト( tf.Graphまたはtf.GraphDef )またはチェックポイントまたはSavedModelオブジェクトとして指定SavedModelます(詳細については、 入力オブジェクトクラスを参照してください)。 transform()関数は、TensorFlowグラフを入力DataFrame内の配列の列(配列がTensorに対応する)に適用し、グラフの出力に対応する配列の列を出力します。

まず、2次元点のサンプルデータセットを生成し、2つの異なる中心の周りに分布するガウス分布

import numpy as np
from pyspark.sql.types import Row

n_sample = 1000
center_0 = [-1.5, 1.5]
center_1 = [1.5, -1.5]

def to_row(args):
  xy, l = args
  return Row(inputCol = xy, label = l)

samples_0 = [np.random.randn(2) + center_0 for _ in range(n_sample//2)]
labels_0 = [0 for _ in range(n_sample//2)]
samples_1 = [np.random.randn(2) + center_1 for _ in range(n_sample//2)]
labels_1 = [1 for _ in range(n_sample//2)]

rows = map(to_row, zip(map(lambda x: x.tolist(), samples_0 + samples_1), labels_0 + labels_1))
sdf = spark.createDataFrame(rows)

次に、テンソルフローグラフとその入力を返す関数を記述する

import tensorflow as tf

def build_graph(sess, w0):
  X = tf.placeholder(tf.float32, shape=[None, 2], name="input_tensor")
  model = tf.sigmoid(tf.matmul(X, w0), name="output_tensor")
  return model, X

以下は、単一のノードでテンソルフローを使用して予測するために記述するコードです。

w0 = np.array([[1], [-1]]).astype(np.float32)
with tf.Session() as sess:
  model, X = build_graph(sess, w0)
  output = sess.run(model, feed_dict = {
    X : samples_0 + samples_1
  })

これで、次のSpark MLlib Transformerを使用して、モデルを分散してDataFrameに適用できます。

from sparkdl import TFTransformer
from sparkdl.graph.input import TFInputGraph
import sparkdl.graph.utils as tfx

graph = tf.Graph()
with tf.Session(graph=graph) as session, graph.as_default():
    _, _ = build_graph(session, w0)
    gin = TFInputGraph.fromGraph(session.graph, session,
                                 ["input_tensor"], ["output_tensor"])

transformer = TFTransformer(
    tfInputGraph=gin,
    inputMapping={'inputCol': tfx.tensor_name("input_tensor")},
    outputMapping={tfx.tensor_name("output_tensor"): 'outputCol'})

odf = transformer.transform(sdf)
Kerasユーザーの場合

KerasTransformerは、TensorFlowに基づくKerasモデルを最大2次元のテンソル入力に適用します。 与えられたモデルファイルパスからKerasモデルをロードし、そのモデルを配列の列(配列はTensorに対応)に適用し、配列の列を出力します。

from sparkdl import KerasTransformer
from keras.models import Sequential
from keras.layers import Dense
import numpy as np

# Generate random input data
num_features = 10
num_examples = 100
input_data = [{"features" : np.random.randn(num_features).tolist()} for i in range(num_examples)]
input_df = sqlContext.createDataFrame(input_data)

# Create and save a single-hidden-layer Keras model for binary classification
# NOTE: In a typical workflow, we'd train the model before exporting it to disk,
# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

# Create transformer and apply it to our input data
transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)
final_df = transformer.transform(input_df)

モデルをSQL関数としてデプロイする

モデルをプロダクト化する1つの方法は、SQLを知っている誰もがそれを使用できるようにするSpark SQL User Defined Functionとして展開することです。 ディープ・ラーニング・パイプラインは、深い学習モデルを採用し、スパークSQLユーザー定義関数(UDF)を登録するメカニズムを提供します。 特に、Deep Learning Pipelines 0.2.0では、画像データを扱うKerasモデルからSQL UDFを作成するためのサポートが追加されています。

結果のUDFは、(イメージ構造体 “SpImage”としてフォーマットされた)列を取り、与えられたKerasモデルの出力を生成します。 例えば、Inception V3の場合、ImageNetオブジェクトカテゴリに対して実数値のスコアベクトルを生成する。

画像上で動作するKerasモデルのUDFを次のように登録することができます。

from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF

registerKerasImageUDF("inceptionV3_udf", InceptionV3(weights="imagenet"))

あるいは、モデルファイルからUDFを登録することもできます。

registerKerasImageUDF("my_custom_keras_model_udf", "/tmp/model-full-tmp.h5")

画像を扱うKerasワークフローでは、モデルが画像に適用される前に前処理ステップを持つのが一般的です。 私たちのワークフローで前処理が必要な場合は、オプションでUDF登録に前処理機能を提供することができます。 プリプロセッサはファイルパスを受け取り、イメージ配列を返す必要があります。 以下は簡単な例です。

from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF

def keras_load_img(fpath):
    from keras.preprocessing.image import load_img, img_to_array
    import numpy as np
    img = load_img(fpath, target_size=(299, 299))
    return img_to_array(img).astype(np.uint8)

registerKerasImageUDF("inceptionV3_udf_with_preprocessing", InceptionV3(weights="imagenet"), keras_load_img)

一度UDFが登録されると、それはSQLクエリで使用することができます。

from pyspark.ml.image import ImageSchema

image_df = ImageSchema.readImages(sample_img_dir)
image_df.registerTempTable("sample_images")
SELECT my_custom_keras_model_udf(image) as predictions from sample_images

ライセンス

  • Deep Learning Pipelinesのソースコードは、Apache License 2.0(LICENSEファイルを参照)の下で公開されています。
  • DeepImageFeaturizer によって提供されたモデル( DeepImageFeaturizerおよびDeepImagePredictor使用されているモデル)は、 https://github.com/fchollet/keras/blob/master/LICENSEにあるMITライセンスのもとで提供されます .MITライセンスの対象となる追加の著作権およびライセンスコードまたはドキュメンテーション。 個々のモデルのライセンス情報については、 Kerasアプリケーションのページも参照してください。







-databricks

執筆者: