#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains functions related to the experimental design of the listening test
"""
import copy
import json
import logging
import random
import datetime
import itertools
from collections import defaultdict
from sqlalchemy import func
import caqe.utilities as utilities
from .models import Condition, Participant, Trial, Test, Group
from caqe import db
from caqe import app
logger = logging.getLogger(__name__)
# Configure and insert conditions
[docs]def insert_tests_and_conditions(config=None):
"""
This is where you configure and define the listening test. If you need to change HTML content based on
the testing condition, you configure it here as well, overriding the default values in `CONFIGURATION`.
Running this doctest initializes the development database.
Parameters
----------
config : flask.Config
The application configuration
Returns
-------
None
Examples
--------
To call this you need the application context, e.g.:
>>> import os
>>> os.environ['APP_MODE'] = 'DEVELOPMENT' # for testing the Development configuration
>>> from caqe import db
APP_MODE=DEVELOPMENT
>>> db.drop_all()
>>> db.create_all()
>>> import caqe
>>> import caqe.experiment as experiment
>>> with caqe.app.app_context():
... experiment.insert_tests_and_conditions()
"""
if config is None:
config = app.config
for test_dict in config['TESTS']:
# store app config variables as well for reference
test_config = copy.deepcopy(config)
del test_config['TESTS']
del test_config['PERMANENT_SESSION_LIFETIME'] # a flask variable
test_config.update(test_dict['test_config_variables'])
test = Test(json.dumps(test_config))
db.session.add(test)
db.session.commit()
for condition_group in test_dict['condition_groups']:
conditions = condition_group['conditions']
del condition_group['conditions']
group = Group(data=json.dumps(condition_group))
db.session.add(group)
db.session.commit()
for condition_dict in conditions:
c = Condition(test_id=test.id, group_id=group.id, data=json.dumps(condition_dict))
db.session.add(c)
db.session.commit()
[docs]def get_available_conditions(limit_to_condition_ids=None):
"""
Get conditions available without regard to participant.
Parameters
----------
limit_to_condition_ids: list of int
List of conditions ids to limit to.
Returns
-------
conditions: list of Condition
The available conditions
"""
finished_conditions = db.session.query(Trial.condition_id).filter(Trial.participant_passed_hearing_test == True). \
group_by(Trial.condition_id).having(func.count('*') >= app.config['TRIALS_PER_CONDITION']).subquery()
conditions = db.session.query(Condition).filter(Condition.id.notin_(finished_conditions))
if limit_to_condition_ids is not None:
conditions = conditions.filter(Condition.id.in_(limit_to_condition_ids))
conditions = conditions.order_by(Condition.id)
return conditions
[docs]def assign_conditions(participant, limit_to_condition_ids=None):
"""
Assign experimental conditions for a participant's trial.
Parameters
----------
participant : caqe.models.Participant
limit_to_condition_ids : list, optional
List of integer ids.
Returns
-------
condition_ids : list of int
"""
# Ideal assignment in our scenario:
# If the participant passed the listening test:
# Assign a participant the first condition (in order of index) that has
# A) not been assigned to them before
# B) has not received the required number of ratings by people that have passed the listening test
# If the participant has not passed the listening test:
# Same as above. This may give us a bit more ratings from lower condition indices for people that have not passed
# the listening test, but I think that is ok.
# construct our subqueries
# conditions which have the required number of trials with hearing_test passed participants
conditions = get_available_conditions(limit_to_condition_ids)
# the conditions the participant has already done
participant_conditions = db.session.query(Trial.condition_id).join(Participant). \
filter(Participant.id == participant.id).subquery()
conditions = conditions.filter(Condition.id.notin_(participant_conditions))
# find which group has the most conditions for this participant
group_id = db.session.query(Condition.group_id).filter(Condition.id.in_([c.id for c in conditions.all()])). \
group_by(Condition.group_id). \
order_by(func.count(Condition.group_id).desc()).first()[0]
condition_group_ids = [group_id,]
# limit to one group
conditions = conditions.filter(Condition.group_id == group_id).order_by(Condition.id).all()
if conditions is None or len(conditions) == 0:
logger.info('No hits left for %r' % participant)
return None
if app.config['LIMIT_SUBJECT_TO_ONE_TASK_TYPE']:
previous_trial = participant.trials.filter(Trial.datetime_completed > datetime.datetime(2015, 5, 25)).first()
try:
if previous_trial.condition.test_id != conditions[0].test_id:
# If the participant is supposed to be limited to one task type, and we are out of all task of that type
logger.info('Subject limited to ont task type. No hits left for %r' % participant)
return None
except AttributeError:
# no previous trials
pass
if app.config['TEST_CONDITION_ORDER_RANDOMIZED']: # i.e. randomize the condition order within a test
# determine what test we are on
current_test_id = conditions[0].test_id
# randomize the order of the conditions within that test
condition_ids = [c.id for c in conditions if c.test_id == current_test_id]
random.shuffle(condition_ids)
condition_ids = condition_ids[:app.config['CONDITIONS_PER_EVALUATION']]
# if there are not enough conditions left from this test, add more from the next.
if len(condition_ids) < app.config['CONDITIONS_PER_EVALUATION']:
more_cids = [c.id for c in conditions if c.test_id == current_test_id + 1]
random.shuffle(more_cids)
condition_ids += more_cids[:(app.config['CONDITIONS_PER_EVALUATION'] - len(condition_ids))]
else:
condition_ids = [c.id for c in conditions[:app.config['CONDITIONS_PER_EVALUATION']]]
logger.info('Participant %r assigned conditions: %r in groups: %r' % (participant,
condition_ids,
condition_group_ids))
return condition_ids, condition_group_ids
[docs]def get_test_configurations(condition_ids, participant_id):
"""
Generate template configuration variables from the list of experimental conditions.
Parameters
----------
condition_ids : list
participant_id : int
Returns
-------
test_configuration : list of list of dict
A list of dictionaries containing all the configuration variables for each test, including a list of conditions
and their variables
"""
test_configurations = []
current_test_id = None
test_config = None
for c_id in condition_ids:
condition = Condition.query.filter_by(id=c_id).first()
if condition.test_id != current_test_id:
if test_config is not None:
test_configurations.append(test_config)
current_test_id = condition.test_id
test_config = {'test': json.loads(condition.test.data),
'conditions': [],
'condition_groups': {}}
condition_data = json.loads(condition.data)
condition_group_data = json.loads(condition.group.data)
if app.config['STIMULUS_ORDER_RANDOMIZED']:
random.shuffle(condition_group_data['stimulus_files'])
random.shuffle(condition_data['stimulus_keys'])
if app.config['ENCRYPT_AUDIO_STIMULI_URLS']:
condition_group_data['reference_files'] = encrypt_audio_stimuli(condition_group_data['reference_files'],
participant_id,
condition.group_id)
condition_group_data['stimulus_files'] = encrypt_audio_stimuli(condition_group_data['stimulus_files'],
participant_id,
condition.group_id)
encoding_map, _, _ = get_encoding_maps(condition_group_data['stimulus_files'])
condition_data['stimulus_keys'] = [encoding_map[key] for key in condition_data['stimulus_keys']]
test_config['condition_groups'][condition.group_id] = condition_group_data
# make sure that condition_id is added to the conditions dict
test_config['conditions'].append(dict({'id': condition.id, 'group_id': condition.group_id}, **condition_data))
test_configurations.append(test_config)
return test_configurations
[docs]def get_encoding_maps(encrypted_audio_stimuli):
"""
Build a stimulus key translation map from the `encypted_audio_stimuli`.
Parameters
----------
encrypted_audio_stimuli: list of tuple
The first element of each duple is a key, the second is the encrypted audio_file_path
For all non-references, the key should be of the form E[0-9+]. The order of the stimuli will be random (except
for the references)
Returns
-------
encoding_map : dict
A map from unencoded to encoded stimulus keys
decoding_map : dict
A map from encoded to unencoded stimulus keys
decrypted_filenames : dict
A map from stimulus key to filename
"""
decrypted_filenames = {}
encoding_map = {}
decoding_map = {}
# decrypt the URLs to find the mapping between s_id and e_id and the real filename
for k, v in encrypted_audio_stimuli:
adict = _decode_url(v)
decrypted_filenames[adict['s_id']] = adict['URL']
encoding_map[adict['s_id']] = adict['e_id']
decoding_map[adict['e_id']] = adict['s_id']
return encoding_map, decoding_map, decrypted_filenames
[docs]def generate_comparison_pairs(condition_datas):
"""
Generate all stimulus comparison pairs for a condition and return in a random order for a paired comparison test.
Parameters
----------
condition_datas: list of dict
List of dictionary of condition data as returned in the test_configuration defined by get_test_configurations()
Returns
-------
condition_datas: list of dict
List of updated dictionary of condition data with a new field, `comparison_pairs`, which is a list of stimulus
pairs, e.g. (('E1','E2'),('E5','E8'),...)
"""
for condition_data in condition_datas:
stimulus_names = [c[0] for c in condition_data['stimulus_files']]
pairs = []
for x in itertools.combinations(stimulus_names, 2):
if random.randint(0, 1):
pairs.append(x)
else:
pairs.append(x[::-1])
random.shuffle(pairs)
condition_data['comparison_pairs'] = pairs
return condition_datas
[docs]def encrypt_audio_stimuli(audio_stimuli, participant_id, condition_group_id):
"""
Reorder and encrypt the condition files. Do this by encoding each file as a special URL. One in which is an
encrypted, serialized, dictionary. The dictionary contains, the participant_id (p_id), the condition_group_id
(g_id), the stimuli_id (s_id), and a encrypted stimuli_id (e_id)
Parameters
----------
audio_stimuli: list of tuple
The first element of each duple is a key, the second is the audio_file_path
For all non-references, the key should be of the form S[0-9+]
participant_id: int
condition_group_id: int
Returns
-------
encrypted_audio_stimuli: list of tuple
The first element of each duple is a key, the second is the encrypted audio_file_path
For all non-references, the key should be of the form E[0-9+].
"""
def encode_url(url, _s_id, _e_id):
adict = {'s_id': _s_id,
'p_id': participant_id,
'g_id': condition_group_id,
'e_id': _e_id,
'URL': url}
return '/audio/' + utilities.encrypt_data(adict) + '.wav'
audio_stimuli = copy.deepcopy(audio_stimuli)
references = [(a[0], encode_url(a[1], a[0], a[0])) for a in audio_stimuli if a[0][0] != 'S']
non_references = [a for a in audio_stimuli if a[0][0] == 'S']
for k, a in enumerate(non_references):
e_id = 'E%d' % (k + 1)
s_id = a[0]
a[0] = e_id
a[1] = encode_url(a[1], s_id, e_id)
return references + non_references
def _decode_url(encrypted_url):
# remove /audio/
encrypted_data = encrypted_url[7:]
# remove .wav
encrypted_data = encrypted_data[:-4]
return utilities.decrypt_data(str(encrypted_data))
[docs]def decrypt_audio_stimuli(condition_data):
"""
Decrypt the audio stimuli URLs from submitted trial data
Parameters
----------
condition_data: dict
The condition data with encrypted audio URLs
Returns
-------
trial_data: dict
"""
encrypted_filenames = condition_data['stimulusFiles']
_, decoding_map, decrypted_filenames = get_encoding_maps(encrypted_filenames)
condition_data['stimulusFiles'] = decrypted_filenames
if app.config['TEST_TYPE'] == 'mushra':
condition_data['ratings'] = dict([(decoding_map[k], v) for k, v in condition_data['ratings'].items()])
elif app.config['TEST_TYPE'] == 'pairwise':
condition_data['ratings'] = dict([(decoding_map[k], v) for k, v in condition_data['ratings'].items()])
###################################################################################################################
# ADD NEW TEST TYPES HERE
###################################################################################################################
return condition_data
[docs]def is_pre_test_survey_valid(survey, inclusion_criteria):
"""
Make sure the participant meets the inclusion critera.
Parameters
----------
survey: dict
inclusion_criteria: list
List of expressions as strings which we will evaluate. If any of these inclusion criteria are False,
return False.
Returns
-------
bool
"""
for ic in inclusion_criteria:
if not eval(ic):
return False
return True