컴퓨터비전 방법론들
기술/Computer Vision

컴퓨터비전 방법론들

https://dacon.io/competitions/official/235746/overview/description

 

카메라 이미지 품질 향상 AI 경진대회 - DACON

 

dacon.io

이 대회 베이스라인 코드들을 분석했지만 정작 다른것들 하느라 시간이 없어 분석만 하고 못써먹었다.

그래서 가만히 두면 잊어버릴것 같고 그러기엔 유용해서 적어둠.

중구난방으로 쓸 거라 나중에 다른 글로 분리해야 할 수도 있다.

1. albumentations 라이브러리로 label image도 augment

https://dacon.io/competitions/official/235746/codeshare/2909?page=2&dtype=recent 

 

from torch.utils.data import Dataset
from PIL import Image
import cv2

class CustomTensorDataset(Dataset):
    """TensorDataset with support of transforms.
    """
    def __init__(self, imagex, imagey=None, transform=None, mode="test"):
        self.mode = mode
        if mode == "train" or mode == "val":
            self.imagex = imagex
            self.imagey = imagey
            self.transform = transform
        elif mode == "test":
            self.imagex = imagex
            self.transform = transform


    def __getitem__(self, index):
        if self.mode == "train" or self.mode == "val":
            x = np.array(Image.open(self.imagex[index]))
            y = np.array(Image.open(self.imagey[index]))
            transformed = self.transform(image=x, target_image=y)
            
            return transformed['image'].type(torch.FloatTensor)/255, transformed['target_image'].type(torch.FloatTensor)/255
        elif self.mode == "test":
            x = np.array(Image.open(self.imagex[index]))
            transformed = self.transform(image=x)
            
            return transformed['image'].type(torch.FloatTensor)/255

    def __len__(self):
        return len(self.imagex)
import albumentations.pytorch
train_transforms = A.Compose([
                              A.Rotate(limit=(-30,30), p=1.0),
                              A.HorizontalFlip(),
                              A.VerticalFlip(),
                            #   A.Normalize(mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246)),
                              A.pytorch.ToTensorV2(),
                              ],
                             additional_targets={'target_image':'image'})

val_transforms = A.Compose([A.pytorch.ToTensorV2(),  
                              ],
                             additional_targets={'target_image':'image'})

 

2. 미리 이미지를 잘라 저장함으로써 cutout augmentation 시간 단축

https://dacon.io/competitions/official/235746/codeshare/2913?page=2&dtype=recent 

 

import concurrent.futures
import functools

IMG_SIZE = img_size
STRIDE = 128
SAVE_TRAIN_PATH = './data/train_input_img_'
SAVE_LABEL_PATH = './data/train_label_img_'
def cut_img(save_path, img_path):
    os.makedirs(f'{save_path}{IMG_SIZE}', exist_ok=True)
    img = cv2.imread(img_path)
    img_name = os.path.basename(img_path)
    num = 0
    for top in range(0, img.shape[0], STRIDE):
        for left in range(0, img.shape[1], STRIDE):
            piece = np.zeros([IMG_SIZE, IMG_SIZE, 3], np.uint8)
            temp = img[top:top+IMG_SIZE, left:left+IMG_SIZE, :]
            piece[:temp.shape[0], :temp.shape[1], :] = temp
            cv2.imwrite(f'{save_path}{IMG_SIZE}/{num}_{img_name}', piece)            
            # np.save(f'{save_path}{img_size}/{num}.npy', piece)                
            num+=1
    return
with concurrent.futures.ProcessPoolExecutor() as executor: # windows local 환경시 max_workers=os.cpu_count()//2
    list(tqdm(
        executor.map(functools.partial(cut_img, "./data/train_input_img_"), train_input_files),             
        desc='train_input_files cut',
        total=len(train_input_files)
    ))
    list(tqdm(
        executor.map(functools.partial(cut_img, './data/train_label_img_'), train_label_files),             
        desc='train_label_files cut',
        total=len(train_label_files)
    ))
    list(tqdm(
        executor.map(functools.partial(cut_img, './data/val_input_img_'), val_input_files),             
        desc='val_input_files cut',
        total=len(val_input_files)
    ))
    list(tqdm(
        executor.map(functools.partial(cut_img, './data/val_label_img_'), val_label_files),             
        desc='val_label_files cut',
        total=len(val_label_files)
    ))

 

3. Unet

이 대회 베이스라인 코드는 resnetv2 101에 tensorflow를 사용하고 있어서 Unet 논문이랑 외부 코드 보고 pytorch로 변환해봤다. 스스로 논문보고 구현한 첫 시도가 아닐까 생각한다.

근데 결과가 이상하게 나와서 과연 여기다 적어도 될지는 모르겠다..

 

baseline code

https://dacon.io/competitions/official/235746/codeshare/2874?page=2&dtype=recent 

def convolution_block(x, filters, size, strides=(1,1), padding='same', activation=True):
    x = tf.keras.layers.Conv2D(filters, size, strides=strides, padding=padding)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    if activation == True:
        x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
    return x

def residual_block(blockInput, num_filters=16):
    x = tf.keras.layers.LeakyReLU(alpha=0.1)(blockInput)
    x = tf.keras.layers.BatchNormalization()(x)
    blockInput = tf.keras.layers.BatchNormalization()(blockInput)
    x = convolution_block(x, num_filters, (3,3) )
    x = convolution_block(x, num_filters, (3,3), activation=False)
    x = tf.keras.layers.Add()([x, blockInput])
    return x
def ResUNet101V2(input_shape=(None, None, 3), dropout_rate=0.1, start_neurons = 16):
    backbone = tf.keras.applications.ResNet101V2(weights=weights, include_top=False, input_shape=input_shape)
    input_layer = backbone.input
    
    conv4 = backbone.layers[122].output
    conv4 = tf.keras.layers.LeakyReLU(alpha=0.1)(conv4)
    pool4 = tf.keras.layers.MaxPooling2D((2, 2))(conv4)
    pool4 = tf.keras.layers.Dropout(dropout_rate)(pool4)
    
    convm = tf.keras.layers.Conv2D(start_neurons * 32, (3, 3), activation=None, padding="same")(pool4)
    convm = residual_block(convm,start_neurons * 32)
    convm = residual_block(convm,start_neurons * 32)
    convm = tf.keras.layers.LeakyReLU(alpha=0.1)(convm)
    
    deconv4 = tf.keras.layers.Conv2DTranspose(start_neurons * 16, (3, 3), strides=(2, 2), padding="same")(convm)
    uconv4 = tf.keras.layers.concatenate([deconv4, conv4])
    uconv4 = tf.keras.layers.Dropout(dropout_rate)(uconv4)
    
    uconv4 = tf.keras.layers.Conv2D(start_neurons * 16, (3, 3), activation=None, padding="same")(uconv4)
    uconv4 = residual_block(uconv4,start_neurons * 16)
    uconv4 = residual_block(uconv4,start_neurons * 16)
    uconv4 = tf.keras.layers.LeakyReLU(alpha=0.1)(uconv4)
    
    deconv3 = tf.keras.layers.Conv2DTranspose(start_neurons * 8, (3, 3), strides=(2, 2), padding="same")(uconv4)
    conv3 = backbone.layers[76].output
    uconv3 = tf.keras.layers.concatenate([deconv3, conv3])    
    uconv3 = tf.keras.layers.Dropout(dropout_rate)(uconv3)
    
    uconv3 = tf.keras.layers.Conv2D(start_neurons * 8, (3, 3), activation=None, padding="same")(uconv3)
    uconv3 = residual_block(uconv3,start_neurons * 8)
    uconv3 = residual_block(uconv3,start_neurons * 8)
    uconv3 = tf.keras.layers.LeakyReLU(alpha=0.1)(uconv3)

    deconv2 = tf.keras.layers.Conv2DTranspose(start_neurons * 4, (3, 3), strides=(2, 2), padding="same")(uconv3)
    conv2 = backbone.layers[30].output
    uconv2 = tf.keras.layers.concatenate([deconv2, conv2])
        
    uconv2 = tf.keras.layers.Dropout(0.1)(uconv2)
    uconv2 = tf.keras.layers.Conv2D(start_neurons * 4, (3, 3), activation=None, padding="same")(uconv2)
    uconv2 = residual_block(uconv2,start_neurons * 4)
    uconv2 = residual_block(uconv2,start_neurons * 4)
    uconv2 = tf.keras.layers.LeakyReLU(alpha=0.1)(uconv2)
    
    deconv1 = tf.keras.layers.Conv2DTranspose(start_neurons * 2, (3, 3), strides=(2, 2), padding="same")(uconv2)
    conv1 = backbone.layers[2].output
    uconv1 = tf.keras.layers.concatenate([deconv1, conv1])
    
    uconv1 = tf.keras.layers.Dropout(0.1)(uconv1)
    uconv1 = tf.keras.layers.Conv2D(start_neurons * 2, (3, 3), activation=None, padding="same")(uconv1)
    uconv1 = residual_block(uconv1,start_neurons * 2)
    uconv1 = residual_block(uconv1,start_neurons * 2)
    uconv1 = tf.keras.layers.LeakyReLU(alpha=0.1)(uconv1)
    
    uconv0 = tf.keras.layers.Conv2DTranspose(start_neurons * 1, (3, 3), strides=(2, 2), padding="same")(uconv1)   
    uconv0 = tf.keras.layers.Dropout(0.1)(uconv0)
    uconv0 = tf.keras.layers.Conv2D(start_neurons * 1, (3, 3), activation=None, padding="same")(uconv0)
    uconv0 = residual_block(uconv0,start_neurons * 1)
    uconv0 = residual_block(uconv0,start_neurons * 1)
    uconv0 = tf.keras.layers.LeakyReLU(alpha=0.1)(uconv0)
    
    uconv0 = tf.keras.layers.Dropout(dropout_rate/2)(uconv0)
    output_layer = tf.keras.layers.Conv2D(3, (1,1), padding="same", activation="sigmoid")(uconv0)    
    
    model = tf.keras.models.Model(input_layer, output_layer)

    return model
optimizer = tf.keras.optimizers.Adam(learning_rate)
model = ResUNet101V2(input_shape=(img_size, img_size, 3),dropout_rate=dropout_rate)
model.compile(loss='mae', optimizer=optimizer)

 

 

내가 pytorch로 변환한거

import torch.nn as nn
import torchvision.models

class Convolution_block(nn.Module):
    def __init__(self, in_channels, out_channels, activation=True):
        super().__init__()
        if activation:
            self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=(3,3), padding=(1,1)),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(),
            )
        else: self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=(3,3), padding=(1,1)),
            nn.BatchNorm2d(out_channels),
        )

    def forward(self, x):
        return self.conv(x)


class Residual_block(nn.Module):
    def __init__(self, out_channels):
        super().__init__()

        self.relu = nn.LeakyReLU()
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.cb1 = Convolution_block(out_channels,out_channels)
        self.cb2 = Convolution_block(out_channels,out_channels, False)

    def forward(self, blockinput):
        x = self.relu(blockinput)
        x = self.bn1(x)
        blockinput = self.bn2(blockinput)
        x = self.cb1(x)
        x = self.cb2(x)
        x = torch.add(x, blockinput)
        return x

# https://github.com/zhoudaxia233/PyTorch-Unet/blob/master/resnet_unet.py
class DUnetResnet101(nn.Module):
    def __init__(self):
        super().__init__()
        tchres101 = torchvision.models.resnet101(pretrained=True)
        encoder_layer = list(tchres101.children())

        self.encoder_block1 = nn.Sequential(*encoder_layer[:3])
        self.encoder_block2 = nn.Sequential(*encoder_layer[3:5])
        self.encoder_block3 = encoder_layer[5]
        self.encoder_block4 = encoder_layer[6]
        self.encoder_block5 = encoder_layer[7]

        self.convm = nn.Conv2d(2048, 1024, kernel_size=(3,3), padding=(1,1))
        self.residualm_1 = Residual_block(1024)
        self.residualm_2 = Residual_block(1024)
        self.relum = nn.LeakyReLU()

        self.upconv6 = nn.ConvTranspose2d(1024, 512, kernel_size=(3,3), stride=(2,2), padding=(1,1), output_padding=(1,1))
        self.dropout6 = nn.Dropout2d(0.1)

        self.conv6 = nn.Conv2d(1024 + 512, 512, kernel_size=(3,3), padding=(1,1))
        self.residual6_1 = Residual_block(512)
        self.residual6_2 = Residual_block(512)
        self.relu6 = nn.LeakyReLU()

        self.upconv7 = nn.ConvTranspose2d(512,256, kernel_size=(3,3), stride=(2,2), padding=(1,1), output_padding=(1,1))
        self.dropout7 = nn.Dropout2d(0.1)

        self.conv7 = nn.Conv2d(512+256,256, kernel_size=(3,3), padding=(1,1))
        self.residual7_1 = Residual_block(256)
        self.residual7_2 = Residual_block(256)
        self.relu7 = nn.LeakyReLU()

        self.upconv8 = nn.ConvTranspose2d(256,128, kernel_size=(3,3), stride=(2,2), padding=(1,1), output_padding=(1,1))
        self.dropout8 = nn.Dropout2d(0.1)

        self.conv8 = nn.Conv2d(256+128,128, kernel_size=(3,3), padding=(1,1))
        self.residual8_1 = Residual_block(128)
        self.residual8_2 = Residual_block(128)
        self.relu8 = nn.LeakyReLU()

        self.upconv9 = nn.ConvTranspose2d(128,64, kernel_size=(3,3), stride=(2,2), padding=(1,1), output_padding=(1,1))
        self.dropout9 = nn.Dropout2d(0.1)

        self.conv9 = nn.Conv2d(64+64,64, kernel_size=(3,3), padding=(1,1))
        self.residual9_1 = Residual_block(64)
        self.residual9_2 = Residual_block(64)
        self.relu9 = nn.LeakyReLU()

        self.upconv10 = nn.ConvTranspose2d(64,32, kernel_size=(3,3), stride=(2,2), padding=(1,1), output_padding=(1,1))
        self.dropout10 = nn.Dropout2d(0.1)

        self.conv10 = nn.Conv2d(32,32, kernel_size=(3,3), padding=(1,1))
        self.residual10_1 = Residual_block(32)
        self.residual10_2 = Residual_block(32)
        self.relu10 = nn.LeakyReLU()

        self.dropout11 = nn.Dropout2d(0.05)
        self.conv11 = nn.Conv2d(32,3, kernel_size=(1,1))

    def forward(self,x):
        block1 = self.encoder_block1(x)
        block2 = self.encoder_block2(block1)
        block3 = self.encoder_block3(block2)
        block4 = self.encoder_block4(block3)
        block5 = self.encoder_block5(block4)

        x = self.convm(block5)
        x = self.residualm_1(x)
        x = self.residualm_2(x)
        x = self.relum(x)

        x = self.upconv6(x)
        x = torch.cat([x, block4], dim=1)
        x = self.dropout6(x)
        x = self.conv6(x)
        x = self.residual6_1(x)
        x = self.residual6_2(x)
        x = self.relu6(x)

        x = self.upconv7(x)
        x = torch.cat([x, block3], dim=1)
        x = self.dropout7(x)

        x = self.conv7(x)
        x = self.residual7_1(x)
        x = self.residual7_2(x)
        x = self.relu7(x)

        x = self.upconv8(x)
        x = torch.cat([x, block2], dim=1)
        x = self.dropout8(x)

        x = self.conv8(x)
        x = self.residual8_1(x)
        x = self.residual8_2(x)
        x = self.relu8(x)

        x = self.upconv9(x)
        x = torch.cat([x, block1], dim=1)
        x = self.dropout9(x)

        x = self.conv9(x)
        x = self.residual9_1(x)
        x = self.residual9_2(x)
        x = self.relu9(x)

        x = self.upconv10(x)
        x = self.dropout10(x)

        x = self.conv10(x)
        x = self.residual10_1(x)
        x = self.residual10_2(x)
        x = self.relu10(x)

        x = self.dropout11(x)
        x = self.conv11(x)

        return x
model = DUnetResnet101()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer=optimizer,
    max_lr=0.1,
    steps_per_epoch=len(train_loader),
    epochs=200,
    pct_start=0.05,
)
criterion = nn.MSELoss()
fp16 = False
scaler = (
        torch.cuda.amp.GradScaler() if fp16 and device != torch.device("cpu") else None
    )

근데 결과가 이렇게 나온다. loss도 똑같이 mse로 한것 같은데 원인을 모르겠다. 아마 분석 되면 글 수정할듯.

 

 

아님 다른 사람이 한거 보자. timm으로 모델 backbone만 불러와서 Unet으로 개조한 것 같다.

https://dacon.io/competitions/official/235746/codeshare/3006 

 

모델공유

카메라 이미지 품질 향상 AI 경진대회

dacon.io

 

 

 

 

 

 

'기술 > Computer Vision' 카테고리의 다른 글

self-supervised learning  (0) 2022.01.17
conv1d, conv2d, conv3d  (0) 2021.11.01
랜덤 샘플 이미지  (0) 2021.10.06
image normalize, inverse normalize  (0) 2021.09.16
tensor image to numpy image, numpy image to tensor image  (0) 2021.09.16