269 lines
10 KiB
Python
269 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding:utf-8 -*-
|
|
#############################################################
|
|
# File: data_loader.py
|
|
# Created Date: Saturday April 4th 2020
|
|
# Author: Chen Xuanhong
|
|
# Email: chenxuanhongzju@outlook.com
|
|
# Last Modified: Tuesday, 5th January 2021 2:12:29 pm
|
|
# Modified By: Chen Xuanhong
|
|
# Copyright (c) 2020 Shanghai Jiao Tong University
|
|
#############################################################
|
|
|
|
import os
|
|
import glob
|
|
import torch
|
|
import random
|
|
from PIL import Image
|
|
from pathlib import Path
|
|
from torch.utils import data
|
|
import torchvision.datasets as dsets
|
|
from torchvision import transforms as T
|
|
import torchvision.transforms.functional as F
|
|
|
|
|
|
class StyleResize(object):
|
|
def __call__(self, images):
|
|
th, tw = images.size # target height, width
|
|
if max(th,tw) > 1800:
|
|
alpha = 1800. / float(min(th,tw))
|
|
h = int(th*alpha)
|
|
w = int(tw*alpha)
|
|
images = F.resize(images, (h, w))
|
|
if max(th,tw) < 800:
|
|
# Resize the smallest side of the image to 800px
|
|
alpha = 800. / float(min(th,tw))
|
|
if alpha < 4.:
|
|
h = int(th*alpha)
|
|
w = int(tw*alpha)
|
|
images = F.resize(images, (h, w))
|
|
else:
|
|
images = F.resize(images, (800, 800))
|
|
return images
|
|
|
|
def __repr__(self):
|
|
return self.__class__.__name__ + '()'
|
|
|
|
class DataPrefetcher():
|
|
def __init__(self, loader):
|
|
self.loader = loader
|
|
self.dataiter = iter(loader)
|
|
self.stream = torch.cuda.Stream()
|
|
# self.mean = torch.tensor([0.485 * 255, 0.456 * 255, 0.406 * 255]).cuda().view(1,3,1,1)
|
|
# self.std = torch.tensor([0.229 * 255, 0.224 * 255, 0.225 * 255]).cuda().view(1,3,1,1)
|
|
# With Amp, it isn't necessary to manually convert data to half.
|
|
# if args.fp16:
|
|
# self.mean = self.mean.half()
|
|
# self.std = self.std.half()
|
|
self.__preload__()
|
|
|
|
def __preload__(self):
|
|
try:
|
|
self.content, self.style, self.label = next(self.dataiter)
|
|
except StopIteration:
|
|
self.dataiter = iter(self.loader)
|
|
self.content, self.style, self.label = next(self.dataiter)
|
|
|
|
with torch.cuda.stream(self.stream):
|
|
self.content= self.content.cuda(non_blocking=True)
|
|
self.style = self.style.cuda(non_blocking=True)
|
|
self.label = self.label.cuda(non_blocking=True)
|
|
# With Amp, it isn't necessary to manually convert data to half.
|
|
# if args.fp16:
|
|
# self.next_input = self.next_input.half()
|
|
# else:
|
|
# self.next_input = self.next_input.float()
|
|
# self.next_input = self.next_input.sub_(self.mean).div_(self.std)
|
|
|
|
def next(self):
|
|
torch.cuda.current_stream().wait_stream(self.stream)
|
|
content = self.content
|
|
style = self.style
|
|
label = self.label
|
|
self.__preload__()
|
|
return content, style, label
|
|
|
|
def __len__(self):
|
|
"""Return the number of images."""
|
|
return len(self.loader)
|
|
|
|
class TotalDataset(data.Dataset):
|
|
"""Dataset class for the Artworks dataset and content dataset."""
|
|
def __init__(self, content_image_dir,style_image_dir,
|
|
selectedContent,selectedStyle,
|
|
content_transform,style_transform,
|
|
subffix='jpg', random_seed=1234):
|
|
"""Initialize and preprocess the CelebA dataset."""
|
|
self.content_image_dir= content_image_dir
|
|
self.style_image_dir = style_image_dir
|
|
self.content_transform= content_transform
|
|
self.style_transform = style_transform
|
|
self.selectedContent = selectedContent
|
|
self.selectedStyle = selectedStyle
|
|
self.subffix = subffix
|
|
self.content_dataset = []
|
|
self.art_dataset = []
|
|
self.random_seed= random_seed
|
|
self.__preprocess__()
|
|
self.num_images = len(self.content_dataset)
|
|
self.art_num = len(self.art_dataset)
|
|
|
|
def __preprocess__(self):
|
|
"""Preprocess the Artworks dataset."""
|
|
print("processing content images...")
|
|
for dir_item in self.selectedContent:
|
|
join_path = Path(self.content_image_dir,dir_item)#.replace('/','_'))
|
|
if join_path.exists():
|
|
print("processing %s"%dir_item)
|
|
images = join_path.glob('*.%s'%(self.subffix))
|
|
for item in images:
|
|
self.content_dataset.append(item)
|
|
else:
|
|
print("%s dir does not exist!"%dir_item)
|
|
label_index = 0
|
|
print("processing style images...")
|
|
for class_item in self.selectedStyle:
|
|
images = Path(self.style_image_dir).glob('%s/*.%s'%(class_item, self.subffix))
|
|
for item in images:
|
|
self.art_dataset.append([item, label_index])
|
|
label_index += 1
|
|
random.seed(self.random_seed)
|
|
random.shuffle(self.content_dataset)
|
|
random.shuffle(self.art_dataset)
|
|
# self.dataset = images
|
|
print('Finished preprocessing the Art Works dataset, total image number: %d...'%len(self.art_dataset))
|
|
print('Finished preprocessing the Content dataset, total image number: %d...'%len(self.content_dataset))
|
|
|
|
def __getitem__(self, index):
|
|
"""Return one image and its corresponding attribute label."""
|
|
filename = self.content_dataset[index]
|
|
image = Image.open(filename)
|
|
content = self.content_transform(image)
|
|
art_index = random.randint(0,self.art_num-1)
|
|
filename,label = self.art_dataset[art_index]
|
|
image = Image.open(filename)
|
|
style = self.style_transform(image)
|
|
return content,style,label
|
|
|
|
def __len__(self):
|
|
"""Return the number of images."""
|
|
return self.num_images
|
|
|
|
def denorm(x):
|
|
out = (x + 1) / 2
|
|
return out.clamp_(0, 1)
|
|
|
|
def GetLoader(s_image_dir,c_image_dir,
|
|
style_selected_dir, content_selected_dir,
|
|
crop_size=178, batch_size=16, num_workers=8,
|
|
colorJitterEnable=True, colorConfig={"brightness":0.05,"contrast":0.05,"saturation":0.05,"hue":0.05}):
|
|
"""Build and return a data loader."""
|
|
|
|
s_transforms = []
|
|
c_transforms = []
|
|
|
|
s_transforms.append(T.Resize(768))
|
|
# s_transforms.append(T.Resize(900))
|
|
c_transforms.append(T.Resize(768))
|
|
|
|
s_transforms.append(T.RandomCrop(crop_size,pad_if_needed=True,padding_mode='reflect'))
|
|
c_transforms.append(T.RandomCrop(crop_size))
|
|
|
|
s_transforms.append(T.RandomHorizontalFlip())
|
|
c_transforms.append(T.RandomHorizontalFlip())
|
|
|
|
s_transforms.append(T.RandomVerticalFlip())
|
|
c_transforms.append(T.RandomVerticalFlip())
|
|
|
|
if colorJitterEnable:
|
|
if colorConfig is not None:
|
|
print("Enable color jitter!")
|
|
colorBrightness = colorConfig["brightness"]
|
|
colorContrast = colorConfig["contrast"]
|
|
colorSaturation = colorConfig["saturation"]
|
|
colorHue = (-colorConfig["hue"],colorConfig["hue"])
|
|
s_transforms.append(T.ColorJitter(brightness=colorBrightness,\
|
|
contrast=colorContrast,saturation=colorSaturation, hue=colorHue))
|
|
c_transforms.append(T.ColorJitter(brightness=colorBrightness,\
|
|
contrast=colorContrast,saturation=colorSaturation, hue=colorHue))
|
|
s_transforms.append(T.ToTensor())
|
|
c_transforms.append(T.ToTensor())
|
|
|
|
s_transforms.append(T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)))
|
|
c_transforms.append(T.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)))
|
|
|
|
s_transforms = T.Compose(s_transforms)
|
|
c_transforms = T.Compose(c_transforms)
|
|
|
|
content_dataset = TotalDataset(c_image_dir,s_image_dir, content_selected_dir, style_selected_dir
|
|
, c_transforms,s_transforms)
|
|
content_data_loader = data.DataLoader(dataset=content_dataset,batch_size=batch_size,
|
|
drop_last=True,shuffle=True,num_workers=num_workers,pin_memory=True)
|
|
prefetcher = DataPrefetcher(content_data_loader)
|
|
return prefetcher
|
|
|
|
def GetValiDataTensors(
|
|
image_dir=None,
|
|
selected_imgs=[],
|
|
crop_size=178,
|
|
mean = (0.5, 0.5, 0.5),
|
|
std=(0.5, 0.5, 0.5)
|
|
):
|
|
|
|
transforms = []
|
|
|
|
transforms.append(T.Resize(768))
|
|
|
|
transforms.append(T.RandomCrop(crop_size,pad_if_needed=True,padding_mode='reflect'))
|
|
|
|
transforms.append(T.ToTensor())
|
|
|
|
transforms.append(T.Normalize(mean=mean, std=std))
|
|
|
|
transforms = T.Compose(transforms)
|
|
|
|
result_img = []
|
|
print("Start to read validation data......")
|
|
if len(selected_imgs) != 0:
|
|
for s_img in selected_imgs:
|
|
if image_dir == None:
|
|
temp_img = s_img
|
|
else:
|
|
temp_img = os.path.join(image_dir, s_img)
|
|
temp_img = Image.open(temp_img)
|
|
temp_img = transforms(temp_img).cuda().unsqueeze(0)
|
|
result_img.append(temp_img)
|
|
else:
|
|
s_imgs = glob.glob(os.path.join(image_dir, '*.jpg'))
|
|
s_imgs = s_imgs + glob.glob(os.path.join(image_dir, '*.png'))
|
|
for s_img in s_imgs:
|
|
temp_img = os.path.join(image_dir, s_img)
|
|
temp_img = Image.open(temp_img)
|
|
temp_img = transforms(temp_img).cuda().unsqueeze(0)
|
|
result_img.append(temp_img)
|
|
print("Finish to read validation data......")
|
|
print("Total validation images: %d"%len(result_img))
|
|
return result_img
|
|
|
|
def ScanAbnormalImg(image_dir, selected_imgs):
|
|
"""Scan the dataset, this function is designed to exclude or remove the non-RGB images."""
|
|
print("processing images...")
|
|
subffix = "jpg"
|
|
for dir_item in selected_imgs:
|
|
join_path = Path(image_dir,dir_item)#.replace('/','_'))
|
|
if join_path.exists():
|
|
print("processing %s"%dir_item)
|
|
images = join_path.glob('*.%s'%(subffix))
|
|
for item in images:
|
|
# print(str(item.name)[0:6])
|
|
# temp = cv2.imread(str(item))
|
|
temp = Image.open(item)
|
|
# exclude the abnormal images
|
|
if temp.mode!="RGB":
|
|
print(temp.mode)
|
|
print("Found one abnormal image!")
|
|
print(item)
|
|
os.remove(str(item))
|
|
|
|
else:
|
|
print("%s dir does not exist!"%dir_item) |