import torch import torch.nn as nn import torchvision from torchvision import transforms import torch.nn.functional as F from ModelConfiguration import VesselSegmentConfig from transformers import PreTrainedModel ###################################################################### # IMAGE DOWN SAMPLING ###################################################################### class ImageDownSampling(nn.Module): def __init__(self, height, width, scale): super().__init__() self.resize = transforms.Resize(size=(height//scale, width//scale)) def forward(self, x): return self.resize(x) ###################################################################### # IMAGE SHARPENING ###################################################################### class ImageSharp(nn.Module): def __init__(self): super(ImageSharp, self).__init__() def forward(self, x): B, C, H, W = x.shape device = x.device # Sharpening kernel: basic 3x3 kernel = torch.tensor([[[[0, -1, 0], [-1, 5, -1], [0, -1, 0]]]], dtype=torch.float32, device=device) # (1, 1, 3, 3) # Apply the kernel using group convolution (one group per channel) kernel = kernel.repeat(C, 1, 1, 1) # (C, 1, 3, 3) --> here C=1, so it's still (1, 1, 3, 3) # Apply convolution sharpened = F.conv2d(x, kernel, padding=1, groups=C) # padding=1 keeps same spatial size # Clamp to stay within valid image range sharpened = torch.clamp(sharpened, 0, 1) return sharpened ###################################################################### # IMAGE PATCHING ###################################################################### class ImagePatching(nn.Module): def __init__(self, patch_size: int): super(ImagePatching, self).__init__() self.patch_size = patch_size self.image_patch = nn.Unfold(kernel_size=patch_size, stride=patch_size) self.image_sharp = ImageSharp() def forward(self, x): batch_size, channels, height, width = x.shape x = self.image_sharp(x) x = self.image_patch(x) x = x.transpose(1, 2).contiguous() x = x.view(-1, height // self.patch_size, width // self.patch_size, channels, self.patch_size, self.patch_size) x = x.view(-1, channels, self.patch_size, self.patch_size) return x ###################################################################### # DOUBLE CONVOLUTION LAYER ###################################################################### class DoubleConvLayer(nn.Module): def __init__(self, in_feature: int, out_feature: int): super(DoubleConvLayer, self).__init__() self.double_conv_layer = nn.Sequential( nn.Conv2d(in_channels=in_feature, out_channels=out_feature, kernel_size=3, padding=1), nn.InstanceNorm2d(num_features=out_feature), nn.LeakyReLU(inplace=True), nn.Conv2d(in_channels=out_feature, out_channels=out_feature, kernel_size=3, padding=1), nn.InstanceNorm2d(num_features=out_feature), nn.LeakyReLU(inplace=True) ) def forward(self, x): return self.double_conv_layer(x) ###################################################################### # FEATURE EXTRACTION FROM ENCODER PART ###################################################################### class EncoderFetureExtraction(nn.Module): def __init__(self, feature: int): super(EncoderFetureExtraction, self).__init__() self.feature_extraction = nn.Sequential( nn.Conv2d(in_channels=feature, out_channels=1, kernel_size=1, stride=1), nn.InstanceNorm2d(num_features=1), nn.LeakyReLU(inplace=True), nn.Sigmoid() ) self.relu = nn.LeakyReLU() def forward(self, x): x1 = self.feature_extraction(x) return x * x1 ###################################################################### # BOTTLENECK LAYER OF THE MODEL ###################################################################### class BottleNeck(nn.Module): def __init__(self, in_ch, out_ch): super(BottleNeck, self).__init__() self.bottleneck = nn.Sequential( nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=3, padding=1), nn.InstanceNorm2d(num_features=out_ch), nn.LeakyReLU(inplace=True) ) def forward(self, x): return self.bottleneck(x) ###################################################################### # SOFT-ATTENTION IN DECODER LAYER ###################################################################### class AttentionGate(nn.Module): def __init__(self, dim_g, dim_x, dim_l): super(AttentionGate, self).__init__() self.Wg = nn.Sequential( nn.Conv2d(in_channels=dim_g, out_channels=dim_l, kernel_size=1, stride=1), nn.BatchNorm2d(num_features=dim_l)) self.Wx = nn.Sequential( nn.Conv2d(in_channels=dim_x, out_channels=dim_l, kernel_size=1, stride=1), nn.BatchNorm2d(num_features=dim_l)) self.alpha_conv = nn.Sequential( nn.Conv2d(in_channels=dim_l, out_channels=1, kernel_size=1, stride=1), nn.BatchNorm2d(num_features=1), nn.Sigmoid()) self.up_conv = nn.ConvTranspose2d(in_channels=dim_g, out_channels=dim_g, kernel_size=2, stride=2) self.relu = nn.ReLU() def forward(self, encoder_tensor, decoder_tensor): # g > x, g is decoder, x is encoder g = self.up_conv(decoder_tensor) # [b, 512, 32, 32] w_x = self.Wx(encoder_tensor) # [b, 128, 32 ,32] w_g = self.Wg(g) # [b, 128, 32, 32] alpha = self.alpha_conv(self.relu(w_x + w_g)) return encoder_tensor * alpha ###################################################################### # IMAGE RECONSTRUCTION FROM PATCH ###################################################################### class ImageFolding(nn.Module): def __init__(self, image_size: int, patch_size: int, batch_size: int): super(ImageFolding, self).__init__() self.num_patches = image_size // patch_size self.batch_size = batch_size self.folding = nn.Fold(output_size=(image_size, image_size), kernel_size=(patch_size, patch_size), stride=(patch_size, patch_size)) def forward(self, x): x1 = x.view(self.batch_size, self.num_patches * self.num_patches, -1) x1 = x1.transpose(1, 2).contiguous() x1 = self.folding(x1) return x1 ###################################################################### # ENCODER LAYERS ###################################################################### class Encoder(nn.Module): def __init__(self, in_channel, out_channel, enc_fet_ch, max_pool_size, is_concate=False): super().__init__() self.double_conv = DoubleConvLayer(in_feature=in_channel, out_feature=out_channel) self.enc_feature_extraction = EncoderFetureExtraction(feature=enc_fet_ch) self.pooling_layer = nn.MaxPool2d(kernel_size=max_pool_size, stride=max_pool_size) self.concat = is_concate def forward(self, x, concat_tensor=None): x = self.double_conv(x) if self.concat: x = torch.cat([concat_tensor, x], dim=1) skip_connection = self.enc_feature_extraction(x) x = self.pooling_layer(x) return x, skip_connection ###################################################################### # Decoder LAYERS ###################################################################### class Decoder(nn.Module): def __init__(self, tensor_dim_encoder, tensor_dim_decoder, tensor_dim_mid, up_conv_in_ch, up_conv_out_ch, up_conv_scale, dconv_in_feature, dconv_out_feature, is_concat=False): super().__init__() self.soft_attention = AttentionGate(dim_g=tensor_dim_decoder, dim_x=tensor_dim_encoder, dim_l=tensor_dim_mid) self.up_conv = nn.ConvTranspose2d(in_channels=up_conv_in_ch, out_channels=up_conv_out_ch, kernel_size=up_conv_scale, stride=up_conv_scale) self.double_conv = DoubleConvLayer(in_feature=dconv_in_feature, out_feature=dconv_out_feature) self.concat = is_concat def forward(self, encoder_tensor, decoder_tensor): x = self.soft_attention(encoder_tensor, decoder_tensor) y = self.up_conv(decoder_tensor) if self.concat: x = torch.cat([x, y], dim=1) x = self.double_conv(x) return x class VesselSegmentModel(PreTrainedModel): config_class = VesselSegmentConfig def __init__(self, config: VesselSegmentConfig=VesselSegmentConfig()): super().__init__(config) # image patch self.img_patch = ImagePatching(patch_size=config.patch_size) # image downsampling self.img_down_sampling_1 = ImageDownSampling(height=config.patch_size, width=config.patch_size, scale=2) self.img_down_sampling_2 = ImageDownSampling(height=config.patch_size, width=config.patch_size, scale=4) # encoder layers self.encoder_layer_1 = Encoder(config.input_channels, config.features[0], enc_fet_ch=config.features[0], max_pool_size=2, is_concate=False) self.encoder_layer_2 = Encoder(config.input_channels, config.features[1], enc_fet_ch=config.features[0]*2, max_pool_size=2, is_concate=True) self.encoder_layer_3 = Encoder(config.input_channels, config.features[2], enc_fet_ch=config.features[0]*4, max_pool_size=2, is_concate=True) # bottle-neck layer self.bottleneck = BottleNeck(in_ch=config.features[2]*2, out_ch=config.features[2]*4) # decoder layers self.decoder_layer_1 = Decoder(tensor_dim_decoder=config.features[-1]*4, tensor_dim_encoder=config.features[-1]*2, tensor_dim_mid=config.features[0], up_conv_in_ch=config.features[-1]*4, up_conv_out_ch=config.features[-1]*2, up_conv_scale=2, dconv_in_feature=config.features[-1]*4, dconv_out_feature=config.features[-1]*2, is_concat=True) self.decoder_layer_2 = Decoder(tensor_dim_decoder=config.features[-1]*2, tensor_dim_encoder=config.features[-1], tensor_dim_mid=config.features[1], up_conv_in_ch=config.features[-1]*2, up_conv_out_ch=config.features[-1], up_conv_scale=2, dconv_in_feature=config.features[-1]*2, dconv_out_feature=config.features[-1], is_concat=True) self.decoder_layer_3 = Decoder(tensor_dim_decoder=config.features[-1], tensor_dim_encoder=config.features[-2], tensor_dim_mid=config.features[2], up_conv_in_ch=config.features[-1], up_conv_out_ch=config.features[-2], up_conv_scale=2, dconv_in_feature=config.features[-1], dconv_out_feature=config.features[-2], is_concat=True) # Segmentation Head self.segmenation_head = nn.Sequential( nn.Conv2d(in_channels=config.features[-3], out_channels=config.num_classes, kernel_size=1, padding=0, stride=1), ImageFolding(image_size=config.image_size[0], patch_size=config.patch_size, batch_size=config.batch_size) ) def forward(self, x): IMG_1 = self.img_patch(x) IMG_2 = self.img_down_sampling_1(IMG_1) IMG_3 = self.img_down_sampling_2(IMG_2) # encoder e1, sk1 = self.encoder_layer_1(IMG_1, None) e2, sk2 = self.encoder_layer_2(IMG_2, e1) e3, sk3 = self.encoder_layer_3(IMG_3, e2) # bottleneck b = self.bottleneck(e3) # decoder d1 = self.decoder_layer_1(sk3, b) d2 = self.decoder_layer_2(sk2, d1) d3 = self.decoder_layer_3(sk1, d2) # head head = self.segmenation_head(d3) return head