1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| import json import torch from torch import nn from torch.nn import MSELoss from torch.optim import Adam from torch.utils.data import TensorDataset, DataLoader from torch.utils.data.distributed import DistributedSampler import horovod.torch as hvd from s3_utils import s3_load_pickle, s3_save_model, s3_save_file import boto3
session = boto3.session.Session()
s3_client = session.client( service_name='s3', aws_access_key_id='XXXX', aws_secret_access_key='XXXX', endpoint_url='http://10.105.222.7:24850', )
train_tensor = torch.tensor(s3_load_pickle(s3_client, 'songwei', 'simple_ml/train.pkl'), dtype=torch.float) test_tensor = torch.tensor(s3_load_pickle(s3_client, 'songwei', 'simple_ml/test.pkl'), dtype=torch.float)
train_dataset = TensorDataset(train_tensor[:, :-1], train_tensor[:, -1:]) test_dataset = TensorDataset(test_tensor[:, :-1], test_tensor[:, -1:])
hvd.init() if torch.cuda.is_available(): torch.cuda.set_device(hvd.local_rank()) torch.set_num_threads(1)
train_sampler = DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank()) train_dl = DataLoader(train_dataset, sampler=train_sampler)
test_sampler = DistributedSampler(test_dataset, num_replicas=hvd.size(), rank=hvd.rank()) test_dl = DataLoader(test_dataset, sampler=test_sampler)
in_dim, hidden_dim, out_dim = 2, 4, 1 hp = json.load(open('hp.json', 'r'))
model = nn.Sequential( nn.Linear(in_dim, hidden_dim), nn.GELU(), nn.Dropout(hp["dropout"]), nn.Linear(hidden_dim, hidden_dim), nn.GELU(), nn.Dropout(hp["dropout"]), nn.Linear(hidden_dim, out_dim) ) model.cuda()
opt = Adam(model.parameters(), lr=hp["lr"] * hvd.size()) opt = hvd.DistributedOptimizer(opt, named_parameters=model.named_parameters(), op=hvd.Average) hvd.broadcast_parameters(model.state_dict(), root_rank=0) hvd.broadcast_optimizer_state(opt, root_rank=0)
loss_fn = MSELoss() epoch = 20
model.train() for e in range(epoch): train_sampler.set_epoch(e) avg_loss = [] for x, y in train_dl: x = x.cuda() y = y.cuda() opt.zero_grad() p = model(x) loss = loss_fn(p, y) loss.backward() opt.step() avg_loss.append(loss.item()) avg_loss = sum(avg_loss) / len(avg_loss) print(f'Training. Epoch {e}, MSE loss: {avg_loss}, Worker: {hvd.rank()}')
model.eval() with torch.no_grad(): avg_loss = [] for x, y in test_dl: x = x.cuda() y = y.cuda() p = model(x) loss = loss_fn(p, y) avg_loss.append(loss.item()) avg_loss = sum(avg_loss) / len(avg_loss) avg_loss = hvd.allreduce(torch.tensor(avg_loss)).item() print(f'Testing. MSE loss: {avg_loss}, Worker: {hvd.rank()}')
s3_save_model(s3_client, model, 'songwei', 'simple_ml/model_save/1/model.pt') s3_save_file(s3_client, 'config.pbtxt', 'songwei', 'simple_ml/model_save/1/config.pbtxt')
|