今まではTensorFlowバックエンドのKerasでディープラーニングをやっていましたが,そろそろKerasを卒業したいなと思いまして,PyTorchを少し触ってみました。
PyTorchにした理由は,
- 実行速度が速い
- ユーザが多い
- Define by Run
- 論文の実装がPyTorchの場合が多い(重要)
といったようなところです。
まずはMNISTでやってみる
Kerasでは2種類ほど過去にディープラーニングの実装はやってみました。
このときに自前(といってもオープンデータですが)を用意して実装したので,この2つをPyTorchに書き換えてもいいかなーとも思ったのですが,とりあえずはサンプル通りにということでMINISTのデータを使ってやってみることにします。
サンプルの理解
ただサンプルを実行してみるだけだとコピペで一瞬で終わるので,自分の理解も兼ねて少し細かくコメントを入れていくことで勉強してみたいと思います。
ここでは処理ごとにソースを分割していきますが,ソースの全文は最後にまとめて記載しておきます。
ネットワーク構成とForward計算の定義
まずはネットワーク構成とForward計算を定義します。
import torch import torch.nn as nn import torch.nn.functional as F class CNN(nn.Module): def __init__(self, num_classes): """ Convolutional Neural Network ネットワーク構成: input - CONV - CONV - MaxPool - CONV - CONV - MaxPool - FC - output ※MaxPoolの直後にバッチ正規化を実施 引数: num_classes: 分類するクラス数(=出力層のユニット数) """ super(CNN, self).__init__() # nn.Moduleを継承する self.block1 = nn.Sequential( nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=1), # 出力サイズ: チャネル=16, 高さ=27, 幅=27 nn.BatchNorm2d(16) ) self.block2 = nn.Sequential( nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=1), # 出力サイズ: チャネル=32, 高さ=26, 幅=26 nn.BatchNorm2d(32) ) self.full_connection = nn.Sequential( nn.Linear(in_features=32*26*26, out_features=512), # in_featuresは直前の出力ユニット数 nn.ReLU(), nn.Dropout(), nn.Linear(in_features=512, out_features=num_classes) ) # Forward計算の定義 # 参考:Define by Runの特徴(入力に合わせてForward計算を変更可) def forward(self, x): x = self.block1(x) x = self.block2(x) # 直前のMaxPoolの出力が2次元(×チャネル数)なので,全結合の入力形式に変換 # 参考:KerasのFlatten()と同じような処理 x = x.view(x.size(0), 32 * 26 * 26) y = self.full_connection(x) return y
- ポイント①:torch.nn.Moduleを継承してネットワーク構成を定義していく
- ポイント②:forward関数の中で入力xに応じて動的に処理を変更できる
- ポイント③:ユニット数の計算が面倒くさい
ポイント①はPyTorchのお作法として大切な点,ポイント②がPyTorchno特徴のひとつであるDefine by Runらしい部分です。
エラーが発生してデバッグをするときも,中で何が起きているのかが完全にトレースできるので結構楽です。
ポイント③については頑張って計算しましょう。特に畳み込み層から全結合層に変わる部分は,全結合側のユニット数(in_features)を何個に設定すればいいのか結構ハマったりします。
適当に検索したコードをコピペせずに,ちゃんと自分でネットワークを設計すれば基本的にハマることはないはずですが(笑)
学習用の関数の定義
学習するための関数を定義します。
# 学習用関数 def train(loader_train, model_obj, optimizer, loss_fn, device, total_epoch, epoch): model_obj.train() # モデルを学習モードに変更 # ミニバッチごとに学習 for data, targets in loader_train: data = data.to(device) # GPUを使用するため,to()で明示的に指定 targets = targets.to(device) # 同上 optimizer.zero_grad() # 勾配を初期化 outputs = model_obj(data) # 順伝播の計算 loss = loss_fn(outputs, targets) # 誤差を計算 loss.backward() # 誤差を逆伝播させる optimizer.step() # 重みを更新する print ('Epoch [%d/%d], Loss: %.4f' % (epoch, total_epoch, loss.item()))
- ポイント①:学習モードの設定を行う
- ポイント②:GPUを使用する場合は明示的に指定する
PyTorchの場合は,学習時と推論時で各モードを設定する必要があります。
また,知らないと結構ハマってしまうポイントだったりしますが,PyTorchではGPUを使用する場合は明示的に指定しなければいけません。
テスト用の関数の定義
テスト用の関数を定義します。
内部的には,まず推論を行ってその後に評価の計算を行っています。
# テスト用関数 def test(loader_test, trained_model, device): trained_model.eval() # モデルを推論モードに変更 correct = 0 # 正解率計算用の変数を宣言 # ミニバッチごとに推論 with torch.no_grad(): # 推論時には勾配は不要 for data, targets in loader_test: data = data.to(device) # GPUを使用するため,to()で明示的に指定 targets = targets.to(device) # 同上 outputs = trained_model(data) # 順伝播の計算 # 推論結果の取得と正誤判定 _, predicted = torch.max(outputs.data, 1) # 確率が最大のラベルを取得 correct += predicted.eq(targets.data.view_as(predicted)).sum() # 正解ならば正解数をカウントアップ # 正解率を計算 data_num = len(loader_test.dataset) # テストデータの総数 print('\nAccuracy: {}/{} ({:.0f}%)\n'.format(correct, data_num, 100. * correct / data_num))
- ポイント①:推論モードに変更する
- ポイント②:GPUを使用する場合は明示的に指定する
- ポイント③:推論と評価の処理は個別に記述する
ポイント①と②は学習時と同様です。
ポイント③については,テストは「推論をまず行って,その結果を使って評価する」という構成で定義します。
main処理の定義
def main(): # 1. GPUの設定(PyTorchでは明示的に指定する必要がある) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(device) # 2. ハイパーパラメータの設定(最低限の設定) batch_size = 100 num_classes = 10 epochs = 3 # 3. MNISTのデータセットを取得 from sklearn.datasets import fetch_openml mnist = fetch_openml('mnist_784', data_home='./') # 4. データの設定(入力データは閉区間[0, 1]に正規化する) import numpy as np x = mnist.data / 255 y = np.array([*map(int, mnist.target)]) # 5. DataLoaderの作成 from torch.utils.data import TensorDataset, DataLoader from sklearn.model_selection import train_test_split # 5-1. データを学習用とテスト用に分割 x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=1/7, random_state=0) # 5-2. データのフォーマットを変換:PyTorchでの形式 = [画像数,チャネル数,高さ,幅] x_train = x_train.reshape(60000, 1, 28, 28) x_test = x_test.reshape(10000, 1, 28 ,28) # 5-3. PyTorchのテンソルに変換 x_train = torch.Tensor(x_train) x_test = torch.Tensor(x_test) y_train = torch.LongTensor(y_train) y_test = torch.LongTensor(y_test) # 5-4. 入力(x)とラベル(y)を組み合わせて最終的なデータを作成 ds_train = TensorDataset(x_train, y_train) ds_test = TensorDataset(x_test, y_test) # 5-5. DataLoaderを作成 loader_train = DataLoader(ds_train, batch_size=batch_size, shuffle=True) loader_test = DataLoader(ds_test, batch_size=batch_size, shuffle=False) # 6. モデル作成 model = CNN(num_classes=num_classes).to(device) print(model) # ネットワークの詳細を確認用に表示 # 7. 損失関数を定義 loss_fn = nn.CrossEntropyLoss() # 8. 最適化手法を定義(ここでは例としてAdamを選択) from torch import optim optimizer = optim.Adam(model.parameters(), lr=0.01) # 9. 学習(エポック終了時点ごとにテスト用データで評価) print('Begin train') for epoch in range(1, epochs+1): train(loader_train, model, optimizer, loss_fn, device, epochs, epoch) test(loader_test, model, device)
- ポイント①:入力のnumpyオブジェクトのサイズは[画像数,チャネル数,縦,幅]
- ポイント②:DataLoaderを作成する
TensorFlowなどでは入力となるnumpyオブジェクトは[画像数,高さ,幅,チャネル数]というサイズになると思います。
しかし,PyTorchでは[画像数,チャネル数,縦,幅]というサイズになるため,データを作成する際には注意が必要です。
また,入力データと教師データをセットにしてDataLoaderを作成する点も,Kerasなどとは少し異なるお作法です。
ソース全文と出力結果例
以上のソースの全文と,実行した場合の出力例を記載しておきます。
※実行時には環境によってWarningが出るかもしれませんが,実行結果には影響はないと思います。
#!/usr/bin/env python3 # -*- encoding: UTF-8 -*- import torch import torch.nn as nn import torch.nn.functional as F class CNN(nn.Module): def __init__(self, num_classes): """ Convolutional Neural Network ネットワーク構成: input - CONV - CONV - MaxPool - CONV - CONV - MaxPool - FC - output ※MaxPoolの直後にバッチ正規化を実施 引数: num_classes: 分類するクラス数(=出力層のユニット数) """ super(CNN, self).__init__() # nn.Moduleを継承する self.block1 = nn.Sequential( nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=1), # 出力サイズ: チャネル=16, 高さ=27, 幅=27 nn.BatchNorm2d(16) ) self.block2 = nn.Sequential( nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=1), # 出力サイズ: チャネル=32, 高さ=26, 幅=26 nn.BatchNorm2d(32) ) self.full_connection = nn.Sequential( nn.Linear(in_features=32*26*26, out_features=512), # in_featuresは直前の出力ユニット数 nn.ReLU(), nn.Dropout(), nn.Linear(in_features=512, out_features=num_classes) ) # Forward計算の定義 # 参考:Define by Runの特徴(入力に合わせてForward計算を変更可) def forward(self, x): x = self.block1(x) x = self.block2(x) # 直前のMaxPoolの出力が2次元(×チャネル数)なので,全結合の入力形式に変換 # 参考:KerasのFlatten()と同じような処理 x = x.view(x.size(0), 32 * 26 * 26) y = self.full_connection(x) return y # 学習用関数 def train(loader_train, model_obj, optimizer, loss_fn, device, total_epoch, epoch): model_obj.train() # モデルを学習モードに変更 # ミニバッチごとに学習 for data, targets in loader_train: data = data.to(device) # GPUを使用するため,to()で明示的に指定 targets = targets.to(device) # 同上 optimizer.zero_grad() # 勾配を初期化 outputs = model_obj(data) # 順伝播の計算 loss = loss_fn(outputs, targets) # 誤差を計算 loss.backward() # 誤差を逆伝播させる optimizer.step() # 重みを更新する print ('Epoch [%d/%d], Loss: %.4f' % (epoch, total_epoch, loss.item())) # テスト用関数 def test(loader_test, trained_model, device): trained_model.eval() # モデルを推論モードに変更 correct = 0 # 正解率計算用の変数を宣言 # ミニバッチごとに推論 with torch.no_grad(): # 推論時には勾配は不要 for data, targets in loader_test: data = data.to(device) # GPUを使用するため,to()で明示的に指定 targets = targets.to(device) # 同上 outputs = trained_model(data) # 順伝播の計算 # 推論結果の取得と正誤判定 _, predicted = torch.max(outputs.data, 1) # 確率が最大のラベルを取得 correct += predicted.eq(targets.data.view_as(predicted)).sum() # 正解ならば正解数をカウントアップ # 正解率を計算 data_num = len(loader_test.dataset) # テストデータの総数 print('\nAccuracy: {}/{} ({:.0f}%)\n'.format(correct, data_num, 100. * correct / data_num)) def main(): # 1. GPUの設定(PyTorchでは明示的に指定する必要がある) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(device) # 2. ハイパーパラメータの設定(最低限の設定) batch_size = 100 num_classes = 10 epochs = 3 # 3. MNISTのデータセットを取得 from sklearn.datasets import fetch_mldata mnist = fetch_mldata('MNIST original', data_home='./') # 4. データの設定(入力データは閉区間[0, 1]に正規化する) x = mnist.data / 255 y = mnist.target # 5. DataLoaderの作成 from torch.utils.data import TensorDataset, DataLoader from sklearn.model_selection import train_test_split # 5-1. データを学習用とテスト用に分割 x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=1/7, random_state=0) # 5-2. データのフォーマットを変換:PyTorchでの形式 = [画像数,チャネル数,高さ,幅] x_train = x_train.reshape(60000, 1, 28, 28) x_test = x_test.reshape(10000, 1, 28 ,28) # 5-3. PyTorchのテンソルに変換 x_train = torch.Tensor(x_train) x_test = torch.Tensor(x_test) y_train = torch.LongTensor(y_train) y_test = torch.LongTensor(y_test) # 5-4. 入力(x)とラベル(y)を組み合わせて最終的なデータを作成 ds_train = TensorDataset(x_train, y_train) ds_test = TensorDataset(x_test, y_test) # 5-5. DataLoaderを作成 loader_train = DataLoader(ds_train, batch_size=batch_size, shuffle=True) loader_test = DataLoader(ds_test, batch_size=batch_size, shuffle=False) # 6. モデル作成 model = CNN(num_classes=num_classes).to(device) print(model) # ネットワークの詳細を確認用に表示 # 7. 損失関数を定義 loss_fn = nn.CrossEntropyLoss() # 8. 最適化手法を定義(ここでは例としてAdamを選択) from torch import optim optimizer = optim.Adam(model.parameters(), lr=0.01) # 9. 学習(エポック終了時点ごとにテスト用データで評価) print('Begin train') for epoch in range(1, epochs+1): train(loader_train, model, optimizer, loss_fn, device, epochs, epoch) test(loader_test, model, device) if __name__ == '__main__': main()
$ python pytorch_tutorial.py cuda CNN( (block1): Sequential( (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (1): ReLU(inplace) (2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (3): ReLU(inplace) (4): MaxPool2d(kernel_size=2, stride=1, padding=0, dilation=1, ceil_mode=False) (5): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) ) (block2): Sequential( (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (1): ReLU(inplace) (2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (3): ReLU(inplace) (4): MaxPool2d(kernel_size=2, stride=1, padding=0, dilation=1, ceil_mode=False) (5): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) ) (full_connection): Sequential( (0): Linear(in_features=21632, out_features=512, bias=True) (1): ReLU() (2): Dropout(p=0.5) (3): Linear(in_features=512, out_features=10, bias=True) ) ) Begin train Epoch [1/3], Loss: 0.2579 Accuracy: 9638/10000 (96%) Epoch [2/3], Loss: 0.1171 Accuracy: 9732/10000 (97%) Epoch [3/3], Loss: 0.1684 Accuracy: 9770/10000 (97%)