import numpy as np
import pandas as pd
import os.path
import random
import collections
from bisect import bisect_right
from bisect import bisect_left
from .. import multimatch_gaze as mp
dtype = [
("onset", "<f8"),
("duration", "<f8"),
("label", "<U10"),
("start_x", "<f8"),
("start_y", "<f8"),
("end_x", "<f8"),
("end_y", "<f8"),
("amp", "<f8"),
("peak_vel", "<f8"),
("med_vel", "<f8"),
("avg_vel", "<f8"),
]
[docs]def same_sample(run=1, subj=1):
"""duplicate dataset to force exactly similar scanpaths. Choose the run
(integer between 1-8) and whether you want a lab (1) or mri (2) subject"""
if subj == 1:
sub = "sub-30"
else:
sub = "sub-10"
path = os.path.join(
"multimatch_gaze/tests/testdata",
"{}_task-movie_run-{}_events.tsv".format(sub, run),
)
loc = os.path.join(
"multimatch_gaze/tests/testdata", "locations_run-{}_events.tsv".format(run)
)
data = np.recfromcsv(
path,
delimiter="\t",
dtype={
"names": (
"onset",
"duration",
"label",
"start_x",
"start_y",
"end_x",
"end_y",
"amp",
"peak_vel",
"med_vel",
"avg_vel",
),
"formats": (
"f8",
"f8",
"U10",
"f8",
"f8",
"f8",
"f8",
"f8",
"f8",
"f8",
"f8",
),
},
)
data2 = data
shots = pd.read_csv(loc, sep="\t")
return data, data2, shots
[docs]def short_shots(run=3):
"""create a shortened shots location annotation to test longshots()"""
loc = os.path.join(
"multimatch_gaze/tests/testdata", "locations_run-{}_events.tsv".format(run)
)
shots = pd.read_csv(loc, sep="\t")
shortshots = shots[0:20]
return shortshots
[docs]def mk_fix_vector(length=5):
"""creates a random length x 3 fixation vector in form of a record array"""
fix = np.recarray(
(0,), dtype=[("start_x", "<f8"), ("start_y", "<f8"), ("duration", "<f8")]
)
for i in range(0, length):
fixation = np.array(
(
np.random.uniform(1, 720),
np.random.uniform(1, 720),
np.random.uniform(0.01, 5),
),
dtype=[("start_x", float), ("start_y", float), ("duration", float)],
)
fix = np.append(fix, fixation)
return fix
[docs]def mk_strucarray(length=5):
"""create a random scanpath in the data format generateScanpathStructureArray
would output"""
fixation_x = random.sample(range(700), length)
fixation_y = random.sample(range(700), length)
fixation_dur = random.sample(range(5), length)
saccade_x = random.sample(range(700), length - 1)
saccade_y = random.sample(range(700), length - 1)
saccade_lenx = random.sample(range(700), length - 1)
saccade_leny = random.sample(range(700), length - 1)
saccade_rho = random.sample(range(700), length - 1)
saccade_theta = random.sample(range(4), length - 1)
eyedata = dict(
fix=dict(x=fixation_x, y=fixation_y, dur=fixation_dur,),
sac=dict(
x=saccade_x,
y=saccade_y,
lenx=saccade_lenx,
leny=saccade_leny,
theta=saccade_theta,
rho=saccade_rho,
),
)
eyedata2 = dict(
fix=dict(
x=fixation_x[::-1] * 2, y=fixation_y[::-1] * 2, dur=fixation_dur[::-1] * 2,
),
sac=dict(
x=saccade_x[::-1] * 2,
y=saccade_y[::-1] * 2,
lenx=saccade_lenx[::-1] * 2,
leny=saccade_leny[::-1] * 2,
theta=saccade_theta[::-1] * 2,
rho=saccade_rho[::-1] * 2,
),
)
return eyedata, eyedata2
[docs]def mk_angles():
"""creates vectors with predefined angular relations. angles1 and angles2
contain the following properties: 1. same 0, 2. 60 diff, 3. 90 diff,
4.120 diff,4. 180 diff (max. dissimilar). They are in sectors (0,1) and
(0, -1).
Angles3 and angles4 contain the same properties reversed and lie in sectors
(-1, 0) and (-1, -1)"""
angles1 = dict(sac=dict(theta=[0, 0.523, 0.785, 1.04, 1.57]))
angles2 = dict(sac=dict(theta=[0, -0.523, -0.785, -1.04, -1.57]))
angles3 = dict(sac=dict(theta=[1.57, 2.093, 2.356, 2.617, 3.14]))
angles4 = dict(sac=dict(theta=[-1.57, -2.093, -2.356, -2.617, -3.14]))
path = [0, 6, 12, 18, 24]
M_assignment = np.arange(5 * 5).reshape(5, 5)
return M_assignment, path, angles1, angles2, angles3, angles4
[docs]def mk_durs():
"""create some example duration for test_durationsim()"""
durations1 = collections.OrderedDict()
durations2 = collections.OrderedDict()
durations1 = dict(fix=dict(dur=[0.001, 20.0, 7, -18, -2.0]))
durations2 = dict(fix=dict(dur=[0.008, 18.0, 7, -11, 3.0]))
path = [0, 6, 12, 18, 24]
M_assignment = np.arange(5 * 5).reshape(5, 5)
return M_assignment, path, durations1, durations2
[docs]def mk_supershort_shots():
data = {
"onset": np.arange(0, 20),
"duration": np.repeat(1, 20),
"locale": np.repeat("somewhere", 20),
}
shots = pd.DataFrame(data)
return shots
[docs]def mk_longershots():
data = {
"onset": np.arange(0, 20),
"duration": np.repeat(5, 20),
"locale": np.repeat("somewhere", 20),
}
shots = pd.DataFrame(data)
return shots
# some functions to work specifically with studyforrest eye tracking data
# Functions specifically for the data at hand
[docs]def takeclosestright(mylist, mynumber):
"""Return integer closest right to 'myNumber' in an ordered list.
:param: mylist: int
:param: mynumber: array
:return: after: float, number within mylist closest to right of my number
"""
pos = bisect_right(mylist, mynumber)
if pos == 0:
return mylist[0]
if pos == len(mylist):
return mylist[-1]
after = mylist[pos]
return after
[docs]def takeclosestleft(mylist, mynumber):
"""Return integer closest left to 'myNumber' in an ordered list.
:param: mylist: int
:param: mynumber: array
:return: after: float, number within mylist closest to the left of mynumber
"""
pos = bisect_left(mylist, mynumber)
if pos == 0:
return mylist[0]
if pos == len(mylist):
return mylist[-1]
before = mylist[pos - 1]
return before
[docs]def create_onsets(data, dur):
"""Create shot onsets from studyforrests location annotation.
Create onset times of all shots of at least 'dur' seconds of length.
:param: data: dataframe
location annotation from studyforrest
:param: dur: float
time in seconds a shot should at least be long
:return: onsets: array-like, list of shot onset times
"""
onsets = []
for index, row in data.iterrows():
if row["duration"] >= dur:
onsets.append(row["onset"])
return onsets
[docs]def create_offsets(data, dur):
"""Create shot offsets from studyforrests location annotation.
Create offset times of all shots of at least 'dur' seconds of length
:param: data: dataframe, location annotation from studyforrest
:param: dur: float, time in seconds a shot should at least be long
:return: onsets: array-like, list of shot offset times
"""
offsets = []
for index, row in data.iterrows():
if row["duration"] >= dur:
# calculate end of shot by adding onset + duration, subtract an
# epsilon to be really sure not to get into a cut
offsets.append(row["onset"] + row["duration"] - 0.03)
return offsets
[docs]def create_chunks(onsets, fixations, dur):
"""Chunk eyetracking data into scanpaths.
Use onset data to obtain indices of full eyetracking data
for chunking.
:param: onsets: array-like, onset times of movie shots
:param: fixations: record array, nx4 fixation vector
(onset, x, y, duration),
output of preprocess() function
:param: dur: float, desired duration of segment length
:return: startidx, endix: array, start and end ids of eyemovement data
to chunk into segments
"""
# initialize empty lists
startidx, endidx = [], []
for shotonset in onsets:
start = takeclosestright(fixations["onset"], shotonset)
startidx.append(np.where(fixations["onset"] == start)[0].tolist())
end = takeclosestright(fixations["onset"], shotonset + dur)
endidx.append(np.where(fixations["onset"] == end)[0].tolist())
# flatten the nested lists
startidx = [element for sublist in startidx for element in sublist]
endidx = [element for sublist in endidx for element in sublist]
return startidx, endidx
[docs]def create_offsetchunks(offsets, fixations, dur):
"""Chunk eyetracking data into scanpaths.
Use offset data to obtain indices of full eyetracking data
for chunking.
:param: offsets: array-like, offset times of movie shots
:param: fixations: record array, nx4 fixation vector
(onset, x, y, duration), output of preprocess()
:param: dur: float, desired duration of segment length
:return: startidx, endix: array start and end ids of eyemovement data
to chunk into segments
"""
startidx, endidx = [], []
for shotoffset in offsets:
start = takeclosestright(fixations["onset"], shotoffset - dur)
startidx.append(np.where(fixations["onset"] == start)[0].tolist())
end = takeclosestleft(fixations["onset"], shotoffset)
endidx.append(np.where(fixations["onset"] == end)[0].tolist())
# flatten the nested lists
startidx = [element for sublist in startidx for element in sublist]
endidx = [element for sublist in endidx for element in sublist]
return startidx, endidx
[docs]def fixations_chunks(fixations, startid, endid):
"""Chunk eyemovement data into scanpaths.
:param: fixations: record array, nx4 fixation vector
(onset, x, y, duration), output of preprocess()
:param: startid, endid: array, start- and end-ids of the
scanpaths, output from either create_chunks()
or create_offsetchunks()
:return: fixation_vector: array-like, a nx3 fixation vector
(x, y, duration)
"""
fixation_vector = []
# slice fixation data according to indices, take columns
# start_x, start_y and duration
for idx in range(0, len(startid)):
ind = fixations[startid[idx] : endid[idx]][["start_x", "start_y", "duration"]]
fixation_vector.append(ind)
return fixation_vector
[docs]def pursuits_to_fixations(remodnav_data):
"""Transform start and endpoints of pursuits to fixations.
Uses the output of a record array created by the remodnav algorithm for
eye-movement classification to transform pursuit data into fixations.
The start and end point of a pursuit are relabeled as a fixation.
This is useful for example if the underlying stimulus material is a
moving image - visual intake of a moving object would then resemble
a pursuit.
:param: npdata: recordarray, remodnav output of eyemovement data
:return: newdata: recordarray
"""
# initialize empty rec array of the same shape
newdata = np.recarray((0,), dtype=dtype)
# reassemble rec array.
# split pursuits to use end and start as fixations later
from copy import deepcopy
data = deepcopy(remodnav_data)
for i, d in enumerate(data):
if data[i]["label"] == "PURS":
# start and end point of pursuit get
# half the total duration
d["duration"] = d["duration"] / 2
d["label"] = "FIXA"
d2 = deepcopy(d)
# end point of the pursuit is start
# of new fixation
d2["onset"] += d2["duration"]
d2["start_x"] = d2["end_x"]
d2["start_y"] = d2["end_y"]
newdata = np.append(newdata, np.array(d, dtype=dtype))
newdata = np.append(newdata, np.array(d2, dtype=dtype))
else:
newdata = np.append(newdata, np.array(d, dtype=dtype))
return newdata
[docs]def preprocess_remodnav(data, screensize):
"""Preprocess record array of eye-events.
A record array from REMoDNaV data is preprocessed
in the following way: Subset to only get fixation data,
disregard out-of-frame gazes, subset to only keep x, y coordinates,
duration.
:param: data: recordarray, REMoDNaV output of eye events from movie
data
:param: screensize: list of float, screen measurements in px
:return: fixations: array-like nx3 fixation vectors (onset, x, y,
duration)
"""
# only fixation labels
filterevents = data[(data["label"] == "FIXA")]
# within x coordinates?
filterxbounds = filterevents[
np.logical_and(
filterevents["start_x"] >= 0, filterevents["start_x"] <= screensize[0]
)
]
# within y coordinates?
filterybounds = filterxbounds[
np.logical_and(
filterxbounds["start_y"] >= 0, filterxbounds["end_y"] <= screensize[1]
)
]
# give me onset times, start_x, start_y and duration
fixations = filterybounds[["onset", "start_x", "start_y", "duration"]]
return fixations
[docs]def read_remodnav(data):
""" Helper to read input data produced by the REMoDNaV algorithm.
Further information on the REMoDNaV algorithm can be found here:
https://github.com/psychoinformatics-de/remodnav
"""
d = np.recfromcsv(data, delimiter="\t", dtype=dtype)
return d
[docs]def longshot(shots, group_shots, ldur=4.92):
"""Group movie shots without a cut together to obtain longer segments.
Note: This way, fewer but longer scanpaths are obtained. Example: use
median shotlength of 4.92s.
:param: shots: dataframe, contains movie location annotation
:param: group_shots: boolean, if True, grouping of movie shots is performed
:param: dur: float, length in seconds for movie shot. An attempt is made to
group short shots without a cut together to form longer shots of ldur
length
:return: aggregated, dataframe of aggregated movie shots
"""
# turn pandas dataframe shots into record array
structshots = shots.to_records()
if group_shots:
i = 0
while i < len(structshots):
# break before running into index error
if structshots[i] == structshots[-1]:
break
else:
if (
(structshots[i]["duration"] < ldur)
& (structshots[i + 1]["duration"] < ldur)
& (structshots[i]["locale"] == structshots[i + 1]["locale"])
):
# add durations together and delete second row
structshots[i]["duration"] += structshots[i + 1]["duration"]
structshots = np.delete(structshots, i + 1, 0)
else:
i += 1
aggregated = pd.DataFrame(
{
"onset": structshots["onset"].tolist(),
"duration": structshots["duration"].tolist(),
},
columns=["onset", "duration"],
)
return aggregated
[docs]def docomparison_forrest(
shots,
data1,
data2,
screensize=[1280, 720],
dur=4.92,
ldur=0,
offset=False,
TDur=0,
TDir=0,
TAmp=0,
grouping=False,
):
"""Compare two scanpaths on five similarity dimensions.
:param: data1, data2: recarray, eyemovement information of forrest gump studyforrest dataset
:param: screensize: list, screen dimensions in px.
:param: ldur: float, duration in seconds. An attempt is made to group short shots
together to form shots of ldur length
:param: grouping: boolean, if True, simplification is performed based on thresholds TAmp,
TDir, and TDur
:param: TDir: float, Direction threshold, angle in degrees.
:param: TDur: float, Duration threshold, duration in seconds.
:param: TAmp: float, Amplitude threshold, length in px.
:return: scanpathcomparisons: array
array of 5 scanpath similarity measures
:return: durations: array-like
durations of extracted scanpaths. Vector (Shape), Direction
(Angle), Length, Position, and Duration. 1 = absolute
similarity, 0 = lowest similarity possible.
:return: onsets: array-like
onset times of the scanpaths
"""
# determine whether short shots should be grouped together
if ldur != 0:
group_shots = True
else:
group_shots = False
scanpathcomparisons = []
# transform pursuits into fixations
newdata1 = pursuits_to_fixations(data1)
newdata2 = pursuits_to_fixations(data2)
print("Loaded data.")
# preprocess input files
fixations1 = preprocess_remodnav(newdata1, screensize)
fixations2 = preprocess_remodnav(newdata2, screensize)
shots = longshot(shots, group_shots, ldur)
# get shots and scanpath on- and offsets
if offset:
onset = create_offsets(shots, dur)
startid1, endid1 = create_offsetchunks(onset, fixations1, dur)
startid2, endid2 = create_offsetchunks(onset, fixations2, dur)
else:
onset = create_onsets(shots, dur)
startid1, endid1 = create_chunks(onset, fixations1, dur)
startid2, endid2 = create_chunks(onset, fixations2, dur)
fixation_vectors1 = fixations_chunks(fixations1, startid1, endid1)
fixation_vectors2 = fixations_chunks(fixations2, startid2, endid2)
print("Split fixation data into {} scanpaths.".format(len(startid1)))
# save onset and duration times, if valid ones can be calculated
onset_times = []
exact_durations = []
for i in range(0, len(startid1)):
onset_time = fixations1[startid1[i]]["onset"]
onset_times.append(onset_time)
exact_duration = (
fixations1[endid1[i]]["onset"] - fixations1[startid1[i]]["onset"]
)
# capture negative durations for invalid scanpaths
if exact_duration > 0:
exact_durations.append(exact_duration)
else:
exact_durations.append(np.nan)
if i == len(startid1):
print("Captured onsets and duration" " times of all scanpath pairs.")
# loop over all fixation vectors/scanpaths and calculate similarity
for i in range(0, len(onset)):
# check if fixation vectors/scanpaths are long enough
if (len(fixation_vectors1[i]) >= 3) & (len(fixation_vectors2[i]) >= 3):
subj1 = mp.gen_scanpath_structure(fixation_vectors1[i])
subj2 = mp.gen_scanpath_structure(fixation_vectors2[i])
if grouping:
subj1 = mp.simplify_scanpath(subj1, TAmp, TDir, TDur)
subj2 = mp.simplify_scanpath(subj2, TAmp, TDir, TDur)
M = mp.cal_vectordifferences(subj1, subj2)
scanpath_dim = np.shape(M)
M_assignment = np.arange(scanpath_dim[0] * scanpath_dim[1]).reshape(
scanpath_dim[0], scanpath_dim[1]
)
numVert, rows, cols, weight = mp.createdirectedgraph(
scanpath_dim, M, M_assignment
)
path, dist = mp.dijkstra(
numVert, rows, cols, weight, 0, scanpath_dim[0] * scanpath_dim[1] - 1
)
unnormalised = mp.getunnormalised(subj1, subj2, path, M_assignment)
normal = mp.normaliseresults(unnormalised, screensize)
scanpathcomparisons.append(normal)
# return nan as result if at least one scanpath it too short
else:
scanpathcomparisons.append(np.repeat(np.nan, 5))
print(
"Scanpath {} had a length of {}, however, a minimal "
"length of 3 is required. Appending nan.".format(
i, min(len(fixation_vectors1), len(fixation_vectors2))
)
)
return scanpathcomparisons, onset_times, exact_durations