#!/usr/bin/env python # coding: utf-8 # In[1]: import os import pandas as pd import decord from decord import VideoReader import numpy as np from PIL import Image from tqdm import tqdm # In[2]: def get_file_stem(path): base=os.path.basename(path) return os.path.splitext(base)[0] def read_metadata(df_path): #read df df = pd.read_csv(df_path,sep=" ",header= None) df.columns = ["video_path","label","frames"] return df def df_to_txt(df,dir_path): df.to_csv(dir_path, header=None, index=None, sep=' ', mode='a') # In[3]: def get_frames_from_split(df,split,dataset_dir,videos_dir,clips_type): clips_base_dir = os.path.join(dataset_dir,split) for i in tqdm(range(len(df.index))): video_id = get_file_stem(df.loc[i,"video_path"]) participant = video_id.split("_")[0] video_file = f"{video_id}.mp4" clip_path = os.path.join(videos_dir,participant,clips_type,video_file) decord_vr = VideoReader(clip_path) n_frames= len(decord_vr) frames_list = np.arange(n_frames) # # Decord frames = decord_vr.get_batch(frames_list).asnumpy() video_folder = os.path.join(clips_base_dir,video_id) if not(os.path.isdir(video_folder)): os.makedirs(video_folder) for i in range(n_frames): im = Image.fromarray(frames[i]) dir_path = os.path.join(video_folder,f"frame_{i}.jpg") im.save(dir_path) #save df df_path = os.path.join(dataset_dir,f"{split}.txt") #hacer apply df["video_path"] = df["video_path"].apply(lambda row: get_file_stem(row)) df = df[["video_path","frames","label"]] df_to_txt(df,df_path) print("Finished") # In[ ]: # dataset_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_dataset_v1" # base_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\Base_Dataset" # split = "Val" # splits =["Train","Test"] # videos_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Participants" # for split in splits: # df_path = os.path.join(dataset_dir,f"{split}.txt") # df = read_metadata(df_path) # get_frames_from_split(df,split,base_dir,videos_dir,"HARClips") # In[ ]: # In[ ]: split = "Train" AL_folder = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_AL" videos_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Participants" base_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\HAR_AL_subsets" for idx in range(len(os.listdir(AL_folder))): subset = f"HAR_Dataset_AL_v{idx}" subset_dir = os.path.join(base_dir,subset) dataset_dir = os.path.join(AL_folder,subset) df_path = os.path.join(dataset_dir,f"{split}.txt") df = read_metadata(df_path) get_frames_from_split(df,split,subset_dir,videos_dir,"HARClips") # # Encode and filter # In[4]: def filter_targets(df,encoder): #case where we encounter new labeled class not covered on #original dataset targets = df.label.values try: targets_enc = encoder.transform(targets) return df except: original_classes = encoder.classes_ actual_classes = set(targets) unseen_targets = [label for label in actual_classes if not(label in original_classes)] invalid_ids =[] for unseen_target in unseen_targets: invalid_ids.append(np.argwhere(targets==unseen_target)) invalid_ids = np.concatenate(invalid_ids) valid_ids = [id for id in range(len(targets)) if not(id in invalid_ids)] return df.loc[valid_ids] # In[5]: import joblib encoder_file = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_dataset_v1\encoder_train.pkl" dataset_v1 = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\Base_Dataset" dataset_AL = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\HAR_Video\HAR_AL_subsets" # file_list_v1 = [os.path.join(dataset_v1,file) for file in os.listdir(dataset_v1) # if "txt" in file] file_list_al = [os.path.join(dataset_AL,folder,file) for folder in os.listdir(dataset_AL) for file in os.listdir(os.path.join(dataset_AL,folder)) if "txt" in file] # file_list = file_list_v1 + file_list_al file_list = file_list_al encoder = joblib.load(encoder_file) for file in file_list: base_dir = os.path.split(file)[0] split = get_file_stem(file) dir_file = os.path.join(base_dir,f"{split}_encodded.txt") #read df df = read_metadata(file) # filter unseen df = filter_targets(df,encoder) #encode labels = encoder.transform(df["label"]) df["label"] = labels df= df[["video_path","frames","label"]] #save df_to_txt(df,dir_file) # In[ ]: file_list_v1 # In[ ]: len(encoder.classes_) # In[ ]: file_list[0] # In[ ]: def filter_targets(df,encoder): #case where we encounter new labeled class not covered on #original dataset targets = df.label.values try: targets_enc = encoder.transform(targets) return df except: original_classes = encoder.classes_ actual_classes = set(targets) unseen_targets = [label for label in actual_classes if not(label in original_classes)] invalid_ids =[] for unseen_target in unseen_targets: invalid_ids.append(np.argwhere(targets==unseen_target)) invalid_ids = np.concatenate(invalid_ids) valid_ids = [id for id in range(len(targets)) if not(id in invalid_ids)] return df.loc[valid_ids] # In[ ]: