import pandas as pd
import os
import numpy as np
from moviepy.editor import VideoFileClip, concatenate_videoclips
base_dir = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\Videos_raw"
clip1 = VideoFileClip(os.path.join(base_dir,"1005b_part1.mp4"))
clip2 = VideoFileClip(os.path.join(base_dir,"1005b_part2.mp4"))
final_clip = concatenate_videoclips([clip1,clip2])
final_clip.write_videofile("1005b.mp4")
data_file = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Participants\1105e\FullDataset\Final_data_1105e.pkl"
data = pd.read_pickle(data_file)
data
def windows(data, size,factor=2):
start = 0
while start + (size / factor) < len(data):
yield int(start), int(start + size)
start += (size / factor)
def to_seconds(time,factor = 1000):
seconds = time/factor
return round(seconds,2)
def get_pictures_dataset(data,user,window_size,factor =1):
df_pictures = {"id":[],"target":[],"start":[],"end":[]}
for idx,(start, end) in enumerate(windows(data.index, window_size,factor)):
df_pictures["id"].append(f"{user}_{idx}")
df_pictures["target"].append(data.loc[start:end,"picture"].mode()[0])
df_pictures["start"].append(to_seconds(data.loc[start,"Recording timestamp"]))
df_pictures["end"].append(to_seconds(data.loc[end,"Recording timestamp"]))
return pd.DataFrame.from_dict(df_pictures).set_index("id")
user = "1105e"
test = get_pictures_dataset(data,user,500)
def filter_nulls(df):
#get samples of majoritary class different than null
samples_max=max(test[test["target"]!="Null"]
.target \
.value_counts() )
#get id for null and rest of labels
null_idx=df[df["target"]=="Null"].index
samples_null = len(null_idx)
if(samples_null>samples_max):
#perform random undersampling
rest_idx = df[df["target"]!="Null"].index
# sample a subset of id from null samples to match majoritary class samples
selected_idx = np.random.choice(range(samples_null), size=samples_max, replace=False)
null_idx=null_idx[selected_idx]
#filter df
df = df.loc[null_idx|rest_idx]
return df
df= filter_nulls(test)
df
from moviepy.editor import *
path_video = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\Videos_raw\1105e.mp4"
video = VideoFileClip(path_video).resize((224,224))
df = df.reset_index()
video_dataset = {
"video_path": [],
"label": [],
"frames": []
}
root_path = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Participants"
video_folder = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Participants\1105e\Clips"
participant ="1105e"
if not(os.path.isdir(video_folder)):
os.makedirs(video_folder)
for idx in tqdm(df.index):
#get clip
clip = video.subclip(df.loc[idx,"start"],df.loc[idx,"end"])
n_frames = clip.duration * clip.fps
label = df.loc[idx,"target"]
# Write the result to a file
clip_id = df.loc[idx,"id"]
video_rel_path = os.path.join(participant,"Clips",f"{clip_id}.mp4")
clip_file= os.path.join(root_path,video_rel_path)
clip.write_videofile(clip_file,audio=False,logger = None)
#update dataset
video_dataset["video_path"].append(video_rel_path)
video_dataset["label"].append(label)
video_dataset["frames"].append(n_frames)
video_dataset
media_path = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\ProjectData\Media"
participants_path = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\ProjectData\Participants"
recordings_path = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\ProjectData\Recordings"
import xml.etree.ElementTree as ET
def get_media_keys(xml_file):
tree = ET.parse(xml_file)
root = tree.getroot()
key = root.find("Key").text
filename = root.find("TargetFileName").text
return key,filename
def get_participant_keys(xml_file):
tree = ET.parse(xml_file)
root = tree.getroot()
key = root.find("Key").text
participant = root.find("Name").text
return key,participant
def get_recording_keys(xml_file):
tree = ET.parse(xml_file)
participant_id = next(elem.text for elem in tree.iter() if "ParticipantId" in elem.tag)
media_id = next(elem.text for elem in tree.iter() if "guid" in elem.tag)
return participant_id, media_id
test = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\ProjectData\Recordings\BqQksdKnyUK4EYy9bga5Yg.rec"
tree = ET.parse(test)
for el in tree.iter():
if("guid" in el.tag):
print(el.text)
dcd22fbf-4ea2-49cf-90da-463d9e49e9ae
#define key mappings
recording_files = [os.path.join(recordings_path,file) for file in os.listdir(recordings_path) if "rec" in file]
recordings = {}
for idx,file in enumerate(recording_files):
participant_id, media_id = get_recording_keys(file)
recordings[idx] = {participant_id: "" , media_id: ""}
x = recordings[1].keys()
x
dict_keys(['2872c273-20e0-4ed3-a3bf-1908a208a760', 'd0dcbb3f-3360-4233-b1e8-9da4a85c3257'])
'2872c273-20e0-4ed3-a3bf-1908a208a760' in x
True
for file in os.listdir(participants_path):
file = os.path.join(participants_path,file)
# get participant key
participant_key,participant_name = get_participant_keys(file)
for key in recordings.keys():
if (participant_key in recordings[key].keys()):
recordings[key][participant_key] = participant_name
media_files = [os.path.join(media_path,file) for file in os.listdir(media_path)
if "xml" in file]
for file in media_files:
# get participant key
media_key,recording_filename = get_media_keys(file)
for key in recordings.keys():
if (media_key in recordings[key].keys()):
recordings[key][media_key] = recording_filename
import json
recordings_metadata_file = r"C:\Users\jeuux\Desktop\Carrera\MoAI\TFM\AnnotatedData\FinalDatasets\Datasets\Video_Dataset_meta\recordings_meta.json"
with open(recordings_metadata_file,"w") as f:
json.dump(recordings,f)
# participant_keys = [get_hash_root(os.path.join(xml_dir,xml_file)) for xml_file in os.listdir(participants_path)]
media_keys = [get_hash_root(os.path.join(media_path,xml_file))
for xml_file in os.listdir(media_path)
if "xml" in xml_file]
participant_keys = {}
for xml_file in os.listdir(participants_path):
if("rec" in xml_file):
key,participant = get_key_name(os.path.join(participants_path,xml_file))
participant_keys[key] = participant
participant_ids = [get_participant_id(os.path.join(recordings_path,xml_file))for xml_file in os.listdir(recordings_path)
if "rec" in xml_file]
ext_rec_id = "b27628c4-2e7f-4e8e-8dd3-d933d165f04e"
key = "b124a406-a7d2-42c9-b811-8cbd6e06b962"
media_dp1 = "dcd22fbf-4ea2-49cf-90da-463d9e49e9ae"
for keys in media_keys:
if(media_dp1==keys):
print("Hi")
Hi
media_keys.values()
xml_file = os.path.join(participants_path,xml_file)
tree = ET.parse(xml_file)
root = tree.getroot()
key = root.find("Key").text
participant = root.find("Name").text
key
for e in elem.iter():
print(e)
tree.getroot()
for keys in participant_ids:
try:
next(media_key for media_key in media_keys if media_key==key)
print ("Match!")
except:
print("there is not any match")
len(participant_keys)==len(media_keys)
print(test_key in participant_keys)
def find_recording(recordings_path,hash_root):
for file in os.listdir(recordings_path):
if("mp4" in file):
root = file.split("==")[0]
if(root==hash_root):
print(root,hash_root)
return True
for xml_file in os.listdir(xml_dir):
hash_root = get_hash_root(os.path.join(xml_dir,xml_file))
find_recording(recordings_path,hash_root)
test_hash = "4KpMREZBURFy2YS6VpeYYw"
for file in os.listdir(xml_dir):
print(get_file_root(file) in test_hash)
def get_file_root(path):
base=os.path.basename(path)
return os.path.splitext(base)[0]