import os
import pandas as pd
import decord
from decord import VideoReader
import numpy as np
from PIL import Image
from tqdm import tqdm
def get_file_stem(path):
base=os.path.basename(path)
return os.path.splitext(base)[0]
def read_metadata(df_path):
#read df
df = pd.read_csv(df_path,sep=" ",header= None)
df.columns = ["video_path","label","frames"]
return df
def df_to_txt(df,dir_path):
df.to_csv(dir_path, header=None, index=None, sep=' ', mode='a')
def get_frames_from_split(df,split,dataset_dir,videos_dir,clips_type):
clips_base_dir = os.path.join(dataset_dir,split)
for i in tqdm(range(len(df.index))):
video_id = get_file_stem(df.loc[i,"video_path"])
participant = video_id.split("_")[0]
video_file = f"{video_id}.mp4"
clip_path = os.path.join(videos_dir,participant,clips_type,video_file)
decord_vr = VideoReader(clip_path)
n_frames= len(decord_vr)
frames_list = np.arange(n_frames)
# # Decord
frames = decord_vr.get_batch(frames_list).asnumpy()
video_folder = os.path.join(clips_base_dir,video_id)
if not(os.path.isdir(video_folder)):
os.makedirs(video_folder)
for i in range(n_frames):
im = Image.fromarray(frames[i])
dir_path = os.path.join(video_folder,f"frame_{i}.jpg")
im.save(dir_path)
#save df
df_path = os.path.join(dataset_dir,f"{split}.txt")
#hacer apply
df["video_path"] = df["video_path"].apply(lambda row: get_file_stem(row))
df = df[["video_path","frames","label"]]
df_to_txt(df,df_path)
print("Finished")
# dataset_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_dataset_v1"
# base_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\Base_Dataset"
# split = "Val"
# splits =["Train","Test"]
# videos_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Participants"
# for split in splits:
# df_path = os.path.join(dataset_dir,f"{split}.txt")
# df = read_metadata(df_path)
# get_frames_from_split(df,split,base_dir,videos_dir,"HARClips")
split = "Train"
AL_folder = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_AL"
videos_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Participants"
base_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\HAR_AL_subsets"
for idx in range(len(os.listdir(AL_folder))):
subset = f"HAR_Dataset_AL_v{idx}"
subset_dir = os.path.join(base_dir,subset)
dataset_dir = os.path.join(AL_folder,subset)
df_path = os.path.join(dataset_dir,f"{split}.txt")
df = read_metadata(df_path)
get_frames_from_split(df,split,subset_dir,videos_dir,"HARClips")
def filter_targets(df,encoder):
#case where we encounter new labeled class not covered on
#original dataset
targets = df.label.values
try:
targets_enc = encoder.transform(targets)
return df
except:
original_classes = encoder.classes_
actual_classes = set(targets)
unseen_targets = [label for label in actual_classes if not(label in original_classes)]
invalid_ids =[]
for unseen_target in unseen_targets:
invalid_ids.append(np.argwhere(targets==unseen_target))
invalid_ids = np.concatenate(invalid_ids)
valid_ids = [id for id in range(len(targets)) if not(id in invalid_ids)]
return df.loc[valid_ids]
import joblib
encoder_file = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_dataset_v1\encoder_train.pkl"
dataset_v1 = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\Base_Dataset"
dataset_AL = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\HAR_AL_subsets"
# file_list_v1 = [os.path.join(dataset_v1,file) for file in os.listdir(dataset_v1)
# if "txt" in file]
file_list_al = [os.path.join(dataset_AL,folder,file) for folder in os.listdir(dataset_AL)
for file in os.listdir(os.path.join(dataset_AL,folder))
if "txt" in file]
# file_list = file_list_v1 + file_list_al
file_list = file_list_al
encoder = joblib.load(encoder_file)
for file in file_list:
base_dir = os.path.split(file)[0]
split = get_file_stem(file)
dir_file = os.path.join(base_dir,f"{split}_encodded.txt")
#read df
df = read_metadata(file)
# filter unseen
df = filter_targets(df,encoder)
#encode
labels = encoder.transform(df["label"])
df["label"] = labels
df= df[["video_path","frames","label"]]
#save
df_to_txt(df,dir_file)
C:\Users\jeuux\Anaconda2\envs\ts_env\lib\site-packages\sklearn\base.py:318: UserWarning: Trying to unpickle estimator LabelEncoder from version 0.22.2.post1 when using version 0.22.1. This might lead to breaking code or invalid results. Use at your own risk. UserWarning)
file_list_v1
len(encoder.classes_)
file_list[0]
def filter_targets(df,encoder):
#case where we encounter new labeled class not covered on
#original dataset
targets = df.label.values
try:
targets_enc = encoder.transform(targets)
return df
except:
original_classes = encoder.classes_
actual_classes = set(targets)
unseen_targets = [label for label in actual_classes if not(label in original_classes)]
invalid_ids =[]
for unseen_target in unseen_targets:
invalid_ids.append(np.argwhere(targets==unseen_target))
invalid_ids = np.concatenate(invalid_ids)
valid_ids = [id for id in range(len(targets)) if not(id in invalid_ids)]
return df.loc[valid_ids]