|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torchvision |
|
|
from torchvision import transforms |
|
|
import torch.nn.functional as F |
|
|
from .ModelConfiguration import VesselSegmentConfig |
|
|
from transformers import PreTrainedModel |
|
|
|
|
|
|
|
|
|
|
|
class ImageDownSampling(nn.Module): |
|
|
def __init__(self, height, width, scale): |
|
|
super().__init__() |
|
|
self.resize = transforms.Resize(size=(height//scale, width//scale)) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.resize(x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImageSharp(nn.Module): |
|
|
def __init__(self): |
|
|
super(ImageSharp, self).__init__() |
|
|
|
|
|
def forward(self, x): |
|
|
B, C, H, W = x.shape |
|
|
device = x.device |
|
|
|
|
|
kernel = torch.tensor([[[[0, -1, 0], |
|
|
[-1, 5, -1], |
|
|
[0, -1, 0]]]], dtype=torch.float32, device=device) |
|
|
|
|
|
kernel = kernel.repeat(C, 1, 1, 1) |
|
|
|
|
|
|
|
|
sharpened = F.conv2d(x, kernel, padding=1, groups=C) |
|
|
|
|
|
|
|
|
sharpened = torch.clamp(sharpened, 0, 1) |
|
|
|
|
|
return sharpened |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImagePatching(nn.Module): |
|
|
def __init__(self, patch_size: int): |
|
|
super(ImagePatching, self).__init__() |
|
|
self.patch_size = patch_size |
|
|
self.image_patch = nn.Unfold(kernel_size=patch_size, stride=patch_size) |
|
|
self.image_sharp = ImageSharp() |
|
|
|
|
|
def forward(self, x): |
|
|
batch_size, channels, height, width = x.shape |
|
|
x = self.image_sharp(x) |
|
|
x = self.image_patch(x) |
|
|
x = x.transpose(1, 2).contiguous() |
|
|
x = x.view(-1, height // self.patch_size, width // self.patch_size, channels, self.patch_size, self.patch_size) |
|
|
x = x.view(-1, channels, self.patch_size, self.patch_size) |
|
|
return x |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DoubleConvLayer(nn.Module): |
|
|
def __init__(self, in_feature: int, out_feature: int): |
|
|
super(DoubleConvLayer, self).__init__() |
|
|
self.double_conv_layer = nn.Sequential( |
|
|
nn.Conv2d(in_channels=in_feature, out_channels=out_feature, kernel_size=3, padding=1), |
|
|
nn.InstanceNorm2d(num_features=out_feature), |
|
|
nn.LeakyReLU(inplace=True), |
|
|
nn.Conv2d(in_channels=out_feature, out_channels=out_feature, kernel_size=3, padding=1), |
|
|
nn.InstanceNorm2d(num_features=out_feature), |
|
|
nn.LeakyReLU(inplace=True) |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.double_conv_layer(x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EncoderFetureExtraction(nn.Module): |
|
|
def __init__(self, feature: int): |
|
|
super(EncoderFetureExtraction, self).__init__() |
|
|
|
|
|
self.feature_extraction = nn.Sequential( |
|
|
nn.Conv2d(in_channels=feature, out_channels=1, kernel_size=1, stride=1), |
|
|
nn.InstanceNorm2d(num_features=1), |
|
|
nn.LeakyReLU(inplace=True), |
|
|
nn.Sigmoid() |
|
|
) |
|
|
|
|
|
self.relu = nn.LeakyReLU() |
|
|
|
|
|
def forward(self, x): |
|
|
x1 = self.feature_extraction(x) |
|
|
return x * x1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BottleNeck(nn.Module): |
|
|
def __init__(self, in_ch, out_ch): |
|
|
super(BottleNeck, self).__init__() |
|
|
self.bottleneck = nn.Sequential( |
|
|
nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=3, padding=1), |
|
|
nn.InstanceNorm2d(num_features=out_ch), |
|
|
nn.LeakyReLU(inplace=True) |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.bottleneck(x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AttentionGate(nn.Module): |
|
|
def __init__(self, dim_g, dim_x, dim_l): |
|
|
super(AttentionGate, self).__init__() |
|
|
self.Wg = nn.Sequential( |
|
|
nn.Conv2d(in_channels=dim_g, out_channels=dim_l, kernel_size=1, stride=1), |
|
|
nn.BatchNorm2d(num_features=dim_l)) |
|
|
|
|
|
self.Wx = nn.Sequential( |
|
|
nn.Conv2d(in_channels=dim_x, out_channels=dim_l, kernel_size=1, stride=1), |
|
|
nn.BatchNorm2d(num_features=dim_l)) |
|
|
|
|
|
self.alpha_conv = nn.Sequential( |
|
|
nn.Conv2d(in_channels=dim_l, out_channels=1, kernel_size=1, stride=1), |
|
|
nn.BatchNorm2d(num_features=1), |
|
|
nn.Sigmoid()) |
|
|
|
|
|
self.up_conv = nn.ConvTranspose2d(in_channels=dim_g, out_channels=dim_g, |
|
|
kernel_size=2, stride=2) |
|
|
|
|
|
self.relu = nn.ReLU() |
|
|
|
|
|
def forward(self, encoder_tensor, decoder_tensor): |
|
|
|
|
|
g = self.up_conv(decoder_tensor) |
|
|
w_x = self.Wx(encoder_tensor) |
|
|
w_g = self.Wg(g) |
|
|
|
|
|
alpha = self.alpha_conv(self.relu(w_x + w_g)) |
|
|
|
|
|
return encoder_tensor * alpha |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImageFolding(nn.Module): |
|
|
def __init__(self, image_size: int, patch_size: int, batch_size: int): |
|
|
super(ImageFolding, self).__init__() |
|
|
self.num_patches = image_size // patch_size |
|
|
self.batch_size = batch_size |
|
|
self.folding = nn.Fold(output_size=(image_size, image_size), |
|
|
kernel_size=(patch_size, patch_size), |
|
|
stride=(patch_size, patch_size)) |
|
|
|
|
|
def forward(self, x): |
|
|
x1 = x.view(self.batch_size, self.num_patches * self.num_patches, -1) |
|
|
x1 = x1.transpose(1, 2).contiguous() |
|
|
x1 = self.folding(x1) |
|
|
return x1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Encoder(nn.Module): |
|
|
def __init__(self, in_channel, out_channel, enc_fet_ch, max_pool_size, is_concate=False): |
|
|
super().__init__() |
|
|
self.double_conv = DoubleConvLayer(in_feature=in_channel, out_feature=out_channel) |
|
|
self.enc_feature_extraction = EncoderFetureExtraction(feature=enc_fet_ch) |
|
|
self.pooling_layer = nn.MaxPool2d(kernel_size=max_pool_size, stride=max_pool_size) |
|
|
self.concat = is_concate |
|
|
|
|
|
def forward(self, x, concat_tensor=None): |
|
|
x = self.double_conv(x) |
|
|
if self.concat: |
|
|
x = torch.cat([concat_tensor, x], dim=1) |
|
|
skip_connection = self.enc_feature_extraction(x) |
|
|
x = self.pooling_layer(x) |
|
|
return x, skip_connection |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Decoder(nn.Module): |
|
|
def __init__(self, tensor_dim_encoder, tensor_dim_decoder, tensor_dim_mid, up_conv_in_ch, up_conv_out_ch, up_conv_scale, dconv_in_feature, dconv_out_feature, is_concat=False): |
|
|
super().__init__() |
|
|
self.soft_attention = AttentionGate(dim_g=tensor_dim_decoder, dim_x=tensor_dim_encoder, dim_l=tensor_dim_mid) |
|
|
self.up_conv = nn.ConvTranspose2d(in_channels=up_conv_in_ch, out_channels=up_conv_out_ch, kernel_size=up_conv_scale, stride=up_conv_scale) |
|
|
self.double_conv = DoubleConvLayer(in_feature=dconv_in_feature, out_feature=dconv_out_feature) |
|
|
self.concat = is_concat |
|
|
|
|
|
def forward(self, encoder_tensor, decoder_tensor): |
|
|
x = self.soft_attention(encoder_tensor, decoder_tensor) |
|
|
y = self.up_conv(decoder_tensor) |
|
|
if self.concat: |
|
|
x = torch.cat([x, y], dim=1) |
|
|
x = self.double_conv(x) |
|
|
return x |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SegmentationHead(nn.Module): |
|
|
def __init__(self, feature_dim, num_classes, config:VesselSegmentConfig = VesselSegmentConfig()): |
|
|
super().__init__() |
|
|
self.config = config |
|
|
self.conv = nn.Conv2d(in_channels=feature_dim, out_channels=num_classes, kernel_size=1, stride=1, padding=0) |
|
|
|
|
|
def forward(self, x, batch_size): |
|
|
x1 = self.conv(x) |
|
|
x1 = ImageFolding(image_size=self.config.image_size[0], patch_size=self.config.patch_size, batch_size=batch_size)(x1) |
|
|
return x1 |
|
|
|
|
|
|
|
|
class VesselSegmentModel(PreTrainedModel): |
|
|
config_class = VesselSegmentConfig |
|
|
def __init__(self, config: VesselSegmentConfig=VesselSegmentConfig()): |
|
|
super().__init__(config) |
|
|
|
|
|
self.img_patch = ImagePatching(patch_size=config.patch_size) |
|
|
|
|
|
|
|
|
self.img_down_sampling_1 = ImageDownSampling(height=config.patch_size, width=config.patch_size, scale=2) |
|
|
self.img_down_sampling_2 = ImageDownSampling(height=config.patch_size, width=config.patch_size, scale=4) |
|
|
|
|
|
|
|
|
self.encoder_layer_1 = Encoder(config.input_channels, config.features[0], enc_fet_ch=config.features[0], max_pool_size=2, is_concate=False) |
|
|
self.encoder_layer_2 = Encoder(config.input_channels, config.features[1], enc_fet_ch=config.features[0]*2, max_pool_size=2, is_concate=True) |
|
|
self.encoder_layer_3 = Encoder(config.input_channels, config.features[2], enc_fet_ch=config.features[0]*4, max_pool_size=2, is_concate=True) |
|
|
|
|
|
|
|
|
self.bottleneck = BottleNeck(in_ch=config.features[2]*2, out_ch=config.features[2]*4) |
|
|
|
|
|
|
|
|
self.decoder_layer_1 = Decoder(tensor_dim_decoder=config.features[-1]*4, tensor_dim_encoder=config.features[-1]*2, tensor_dim_mid=config.features[0], up_conv_in_ch=config.features[-1]*4, up_conv_out_ch=config.features[-1]*2, up_conv_scale=2, dconv_in_feature=config.features[-1]*4, dconv_out_feature=config.features[-1]*2, is_concat=True) |
|
|
self.decoder_layer_2 = Decoder(tensor_dim_decoder=config.features[-1]*2, tensor_dim_encoder=config.features[-1], tensor_dim_mid=config.features[1], up_conv_in_ch=config.features[-1]*2, up_conv_out_ch=config.features[-1], up_conv_scale=2, dconv_in_feature=config.features[-1]*2, dconv_out_feature=config.features[-1], is_concat=True) |
|
|
self.decoder_layer_3 = Decoder(tensor_dim_decoder=config.features[-1], tensor_dim_encoder=config.features[-2], tensor_dim_mid=config.features[2], up_conv_in_ch=config.features[-1], up_conv_out_ch=config.features[-2], up_conv_scale=2, dconv_in_feature=config.features[-1], dconv_out_feature=config.features[-2], is_concat=True) |
|
|
|
|
|
|
|
|
self.segmenation_head = SegmentationHead(feature_dim=config.features[-3], num_classes=config.num_classes) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self, x): |
|
|
B,C,H,W = x.shape |
|
|
IMG_1 = self.img_patch(x) |
|
|
IMG_2 = self.img_down_sampling_1(IMG_1) |
|
|
IMG_3 = self.img_down_sampling_2(IMG_2) |
|
|
|
|
|
|
|
|
e1, sk1 = self.encoder_layer_1(IMG_1, None) |
|
|
e2, sk2 = self.encoder_layer_2(IMG_2, e1) |
|
|
e3, sk3 = self.encoder_layer_3(IMG_3, e2) |
|
|
|
|
|
|
|
|
b = self.bottleneck(e3) |
|
|
|
|
|
|
|
|
d1 = self.decoder_layer_1(sk3, b) |
|
|
d2 = self.decoder_layer_2(sk2, d1) |
|
|
d3 = self.decoder_layer_3(sk1, d2) |
|
|
|
|
|
|
|
|
head = self.segmenation_head(d3, B) |
|
|
|
|
|
return head |
|
|
|