| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | from __future__ import print_function |
| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | import numpy as np |
| | from collections import OrderedDict |
| |
|
| | |
| | |
| | import os |
| | import random |
| |
|
| | |
| | |
| | SEED1 = 1337 |
| | NEW_LINE = "\n" |
| |
|
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | def set_seed(seed): |
| | |
| | |
| | torch.backends.cudnn.deterministic = True |
| | torch.backends.cudnn.benchmark = False |
| | |
| | |
| | |
| | |
| |
|
| | |
| | def angle_incidence_calculation(b, c, alpha, last_ray=False): |
| | ''' |
| | # remove invalid values: |
| | if(last_ray): # the last ray |
| | if(np.isnan(b) or np.isinf(b)): |
| | b = 60. |
| | if(np.isnan(c) or np.isinf(c)): |
| | c = 60. |
| | else: |
| | b[np.isnan(b)] = 60. |
| | b[np.isinf(b)] = 60. |
| | c[np.isnan(c)] = 60. |
| | c[np.isinf(c)] = 60. |
| | ''' |
| | |
| | a = np.sqrt(b*b + c*c - 2*b*c*np.cos(alpha)) |
| | if(last_ray): |
| | beta = np.arccos([(a*a + c*c - b*b)/(2*a*c)]) |
| | theta = np.abs(np.pi/2 - beta) |
| | else: |
| | gamma = np.arccos([(a*a + b*b - c*c)/(2*a*b)]) |
| | theta = np.abs(np.pi/2 - gamma) |
| |
|
| | return theta |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | POINTS = 1081 |
| | class VaeTestDataset(torch.utils.data.Dataset): |
| | def __init__(self, img_path, file_name): |
| | |
| | |
| | self.scan_file_names = [] |
| | self.intensity_file_names = [] |
| | |
| | self.label_file_names = [] |
| | |
| | |
| | |
| | self.s_mu = 4.518406 |
| | self.s_std = 8.2914915 |
| | self.i_mu = 3081.8167 |
| | self.i_std = 1529.4413 |
| | self.a_mu = 0.5959513 |
| | self.a_std = 0.4783924 |
| | |
| | fp_folder = open(img_path+'dataset.txt','r') |
| | |
| | |
| | for folder_line in fp_folder.read().split(NEW_LINE): |
| | if('-' in folder_line): |
| | folder_path = folder_line |
| | fp_file = open(img_path+folder_path+'/'+file_name+'.txt', 'r') |
| | for line in fp_file.read().split(NEW_LINE): |
| | if('.npy' in line): |
| | self.scan_file_names.append(img_path+folder_path+'/scans_lidar/'+line) |
| | self.intensity_file_names.append(img_path+folder_path+'/intensities_lidar/'+line) |
| | |
| | self.label_file_names.append(img_path+folder_path+'/semantic_label/'+line) |
| | |
| | fp_file.close() |
| |
|
| | |
| | fp_folder.close() |
| |
|
| | self.length = len(self.scan_file_names) |
| |
|
| | print("dataset length: ", self.length) |
| |
|
| |
|
| | def __len__(self): |
| | return self.length |
| |
|
| | def __getitem__(self, idx): |
| | |
| | scan = np.zeros((1, POINTS)) |
| | intensity = np.zeros((1, POINTS)) |
| | angle_incidence = np.zeros((1, POINTS)) |
| | label = np.zeros((1, POINTS)) |
| | |
| | |
| | intensity_name = self.intensity_file_names[idx] |
| | intensity = np.load(intensity_name) |
| |
|
| | |
| | scan_name = self.scan_file_names[idx] |
| | scan = np.load(scan_name) |
| |
|
| | |
| | label_name = self.label_file_names[idx] |
| | label = np.load(label_name) |
| |
|
| | |
| | b = scan[:-1] |
| | c = scan[1:] |
| | alpha = np.ones(POINTS - 1)*((270*np.pi / 180) / (POINTS - 1)) |
| | theta = angle_incidence_calculation(b, c, alpha) |
| | |
| | b_last = scan[-2] |
| | c_last = scan[-1] |
| | alpha_last = (270*np.pi / 180) / (POINTS - 1) |
| | theta_last = angle_incidence_calculation(b_last, c_last, alpha_last, last_ray=True) |
| | angle_incidence = np.concatenate((theta[0], theta_last), axis=0) |
| |
|
| | |
| | scan[np.isnan(scan)] = 0. |
| | scan[np.isinf(scan)] = 0. |
| |
|
| | intensity[np.isnan(intensity)] = 0. |
| | intensity[np.isinf(intensity)] = 0. |
| |
|
| | angle_incidence[np.isnan(angle_incidence)] = 0. |
| | angle_incidence[np.isinf(angle_incidence)] = 0. |
| |
|
| | label[np.isnan(label)] = 0. |
| | label[np.isinf(label)] = 0. |
| |
|
| | |
| | |
| | |
| | scan = (scan - self.s_mu) / self.s_std |
| |
|
| | |
| | |
| | intensity = (intensity - self.i_mu) / self.i_std |
| |
|
| | |
| | |
| | angle_incidence = (angle_incidence - self.a_mu) / self.a_std |
| |
|
| | |
| | scan_tensor = torch.FloatTensor(scan) |
| | intensity_tensor = torch.FloatTensor(intensity) |
| | angle_incidence_tensor = torch.FloatTensor(angle_incidence) |
| | label_tensor = torch.FloatTensor(label) |
| |
|
| | data = { |
| | 'scan': scan_tensor, |
| | 'intensity': intensity_tensor, |
| | 'angle_incidence': angle_incidence_tensor, |
| | 'label': label_tensor, |
| | } |
| |
|
| | return data |
| |
|
| | |
| | |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | class Residual(nn.Module): |
| | def __init__(self, in_channels, num_hiddens, num_residual_hiddens): |
| | super(Residual, self).__init__() |
| | self._block = nn.Sequential( |
| | nn.ReLU(True), |
| | nn.Conv1d(in_channels=in_channels, |
| | out_channels=num_residual_hiddens, |
| | kernel_size=3, stride=1, padding=1, bias=False), |
| | nn.BatchNorm1d(num_residual_hiddens), |
| | nn.ReLU(True), |
| | nn.Conv1d(in_channels=num_residual_hiddens, |
| | out_channels=num_hiddens, |
| | kernel_size=1, stride=1, bias=False), |
| | nn.BatchNorm1d(num_hiddens) |
| | ) |
| | |
| | def forward(self, x): |
| | return x + self._block(x) |
| |
|
| | class ResidualStack(nn.Module): |
| | def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
| | super(ResidualStack, self).__init__() |
| | self._num_residual_layers = num_residual_layers |
| | self._layers = nn.ModuleList([Residual(in_channels, num_hiddens, num_residual_hiddens) |
| | for _ in range(self._num_residual_layers)]) |
| |
|
| | def forward(self, x): |
| | for i in range(self._num_residual_layers): |
| | x = self._layers[i](x) |
| | return F.relu(x) |
| |
|
| | |
| | |
| | class Encoder(nn.Module): |
| | def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
| | super(Encoder, self).__init__() |
| | self._conv_1 = nn.Sequential(*[ |
| | nn.Conv1d(in_channels=in_channels, |
| | out_channels=num_hiddens//2, |
| | kernel_size=4, |
| | stride=2, |
| | padding=1), |
| | nn.BatchNorm1d(num_hiddens//2), |
| | nn.ReLU(True) |
| | ]) |
| | self._conv_2 = nn.Sequential(*[ |
| | nn.Conv1d(in_channels=num_hiddens//2, |
| | out_channels=num_hiddens, |
| | kernel_size=4, |
| | stride=2, |
| | padding=1), |
| | nn.BatchNorm1d(num_hiddens) |
| | |
| | ]) |
| | self._residual_stack = ResidualStack(in_channels=num_hiddens, |
| | num_hiddens=num_hiddens, |
| | num_residual_layers=num_residual_layers, |
| | num_residual_hiddens=num_residual_hiddens) |
| |
|
| | def forward(self, inputs): |
| | x = self._conv_1(inputs) |
| | x = self._conv_2(x) |
| | x = self._residual_stack(x) |
| | return x |
| |
|
| | |
| | class Decoder(nn.Module): |
| | def __init__(self, out_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
| | super(Decoder, self).__init__() |
| | |
| | self._residual_stack = ResidualStack(in_channels=num_hiddens, |
| | num_hiddens=num_hiddens, |
| | num_residual_layers=num_residual_layers, |
| | num_residual_hiddens=num_residual_hiddens) |
| |
|
| | self._conv_trans_2 = nn.Sequential(*[ |
| | nn.ReLU(True), |
| | nn.ConvTranspose1d(in_channels=num_hiddens, |
| | out_channels=num_hiddens//2, |
| | kernel_size=4, |
| | stride=2, |
| | padding=1), |
| | nn.BatchNorm1d(num_hiddens//2), |
| | nn.ReLU(True) |
| | ]) |
| |
|
| | self._conv_trans_1 = nn.Sequential(*[ |
| | nn.ConvTranspose1d(in_channels=num_hiddens//2, |
| | out_channels=num_hiddens//2, |
| | kernel_size=4, |
| | stride=2, |
| | padding=1, |
| | output_padding=1), |
| | nn.BatchNorm1d(num_hiddens//2), |
| | nn.ReLU(True), |
| | nn.Conv1d(in_channels=num_hiddens//2, |
| | out_channels=out_channels, |
| | kernel_size=3, |
| | stride=1, |
| | padding=1), |
| | |
| | ]) |
| |
|
| | def forward(self, inputs): |
| | x = self._residual_stack(inputs) |
| | x = self._conv_trans_2(x) |
| | x = self._conv_trans_1(x) |
| | return x |
| |
|
| | class VAE_Encoder(nn.Module): |
| | def __init__(self, input_channel, num_hiddens, num_residual_layers, num_residual_hiddens, embedding_dim): |
| | super(VAE_Encoder, self).__init__() |
| | |
| | self.input_channels = input_channel |
| | ''' |
| | # Constants |
| | num_hiddens = 128 #128 |
| | num_residual_hiddens = 64 #32 |
| | num_residual_layers = 2 |
| | embedding_dim = 2 #64 |
| | ''' |
| |
|
| | |
| | in_channels = input_channel |
| | self._encoder = Encoder(in_channels, |
| | num_hiddens, |
| | num_residual_layers, |
| | num_residual_hiddens) |
| |
|
| | |
| | self._encoder_z_mu = nn.Conv1d(in_channels=num_hiddens, |
| | out_channels=embedding_dim, |
| | kernel_size=1, |
| | stride=1) |
| | self._encoder_z_log_sd = nn.Conv1d(in_channels=num_hiddens, |
| | out_channels=embedding_dim, |
| | kernel_size=1, |
| | stride=1) |
| | |
| | def forward(self, x): |
| | |
| | x = x.reshape(-1, self.input_channels, POINTS) |
| | |
| | encoder_out = self._encoder(x) |
| | |
| | z_mu = self._encoder_z_mu(encoder_out) |
| | z_log_sd = self._encoder_z_log_sd(encoder_out) |
| | return z_mu, z_log_sd |
| |
|
| | |
| | class S3Net(nn.Module): |
| | def __init__(self, input_channels, output_channels): |
| | super(S3Net, self).__init__() |
| | |
| | self.input_channels = input_channels |
| | self.latent_dim = 270 |
| | self.output_channels = output_channels |
| |
|
| | |
| | num_hiddens = 64 |
| | num_residual_hiddens = 32 |
| | num_residual_layers = 2 |
| | embedding_dim = 1 |
| | |
| | |
| | self._encoder = VAE_Encoder(self.input_channels, |
| | num_hiddens, |
| | num_residual_layers, |
| | num_residual_hiddens, |
| | embedding_dim) |
| |
|
| | |
| | self._decoder_z_mu = nn.ConvTranspose1d(in_channels=embedding_dim, |
| | out_channels=num_hiddens, |
| | kernel_size=1, |
| | stride=1) |
| | self._decoder = Decoder(self.output_channels, |
| | num_hiddens, |
| | num_residual_layers, |
| | num_residual_hiddens) |
| |
|
| | self.softmax = nn.Softmax(dim=1) |
| |
|
| | |
| |
|
| | def vae_reparameterize(self, z_mu, z_log_sd): |
| | """ |
| | :param mu: mean from the encoder's latent space |
| | :param log_sd: log standard deviation from the encoder's latent space |
| | :output: reparameterized latent variable z, Monte carlo KL divergence |
| | """ |
| | |
| | z_mu = z_mu.reshape(-1, self.latent_dim, 1) |
| | z_log_sd = z_log_sd.reshape(-1, self.latent_dim, 1) |
| | |
| | |
| | pz = torch.distributions.Normal(loc=torch.zeros_like(z_mu), scale=torch.ones_like(z_log_sd)) |
| | |
| | qz_x = torch.distributions.Normal(loc=z_mu, scale=torch.exp(z_log_sd)) |
| |
|
| | |
| | z = qz_x.rsample() |
| | |
| | |
| | kl_divergence = (pz.log_prob(z) - qz_x.log_prob(z)).sum(dim=1) |
| | kl_loss = -kl_divergence.mean() |
| |
|
| | return z, kl_loss |
| |
|
| | def forward(self, x_s, x_i, x_a): |
| | """ |
| | Forward pass `input_img` through the network |
| | """ |
| | |
| | |
| | |
| | x_s = x_s.reshape(-1, 1, POINTS) |
| | x_i = x_i.reshape(-1, 1, POINTS) |
| | x_a = x_a.reshape(-1, 1, POINTS) |
| | |
| | x = torch.cat([x_s, x_i, x_a], dim=1) |
| | |
| | |
| | z_mu, z_log_sd = self._encoder(x) |
| |
|
| | |
| | z, kl_loss = self.vae_reparameterize(z_mu, z_log_sd) |
| | |
| | |
| | |
| | z = z.reshape(-1, 1, 270) |
| | x_d = self._decoder_z_mu(z) |
| | semantic_channels = self._decoder(x_d) |
| |
|
| | |
| | semantic_scan = self.softmax(semantic_channels) |
| |
|
| | return semantic_scan, semantic_channels, kl_loss |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|