Skip to content

read_data 🗿

Read & prepare data files.

This module can be run from the command line interface (CLI) to read and prepare data for further analysis.

CLI Usage

python -m facesim3d.read_data --help

With this, one can also delete the processed data from the remote (DynamoDB).

Functions:

Name Description
archive_former_tables

Move former tables to archive.

convert_date_time

Convert date-time string to format YYYY-MM-DD HH:MM:SS:MS.

convert_dynamodb_json_in_row_of_df

Convert a row in a pandas Dataframe (df), which is in the 'DynamoDBjson`-format, into a normal df format.

create_all_triple_combinations

Create all triplet combinations.

delete_all_items_in_table_on_dynamodb

Delete all items in table on DynamoDB (e.g., 'UXFData.FaceSim.TrialResults').

finalized_triplets

Provide an overview of the finalized triplets.

finalized_triplets_multi_sub_sample

Provide an overview of finalized triplets.

get_current_state_of_triplets

Get the current state of triplets (e.g., which triplet is currently in the experiment).

get_list_of_acquired_sets

Get the list of all sets that have been acquired.

get_participant_session

Get session (2D, 3D) of the given participant ID (ppid).

get_participant_set_numbers

Get the Set-number(s) of a given participant.

get_triplet_ids_and_heads

Get all triplet_id's and heads corresponding to UXFData.FaceSim.TripletsIDB.*D[.Pilot].

load_local_table

Load a UXFData.FaceSim.* table from the local storage system.

load_table_from_dynamodb

Load a UXFData.FaceSim.* table from DynamoDB.

load_trial_results_from_dynamodb

Load all trial-results from DynamoDB.

main

Run the main function of read_data.py.

merge_tables

Merge a given table (df) with an existing table of the given table name.

plot_triplet_matrix

Plot matrix of triplets.

read_and_convert_s3_results_json_data

Get the full trial table of the main study from memory.

read_logs_of_set

Read all log tables of a given Set number.

read_participant_data

Get the full participant table of the main study.

read_pilot_data

Get the full trial table of the pilot study (version 2).

read_pilot_participant_data

Get the full participant table of the pilot study (version 2).

read_prolific_participant_data

Read the participant table of a given Set downloaded from Prolific.

read_trial_results_of_participant

Read all trial results of a given participant.

read_trial_results_of_session

Read all trial results of a given session.

read_trial_results_of_set

Read all trial results of a given Set number.

remove_invalid_trials

Remove invalid trials from a given trial results table.

save_merged_tables_of_set

Merge all tables of a given type ("TrialResults", "SessionLog") in a given Set.

set_infix

Generate the Set infix (e.g., 's004' OR 's011') from a set number.

update_triplet_table_on_dynamodb

Update the triplet table on DynamoDB.

update_triplet_table_on_dynamodb_multi_sub_sample

Update triplet table on DynamoDB for the given session of the multi-sampled-sub-sample.

where_to_find_trial_and_log_data

Get information about in which files trial results and log data can be found for a given Set number.

archive_former_tables 🗿

archive_former_tables(
    path_to_save: str | Path, table_name: str
) -> None

Move former tables to archive.

Parameters:

Name Type Description Default
path_to_save str | Path

Path where new table will be saved

required
table_name str

name of table

required
Source code in code/facesim3d/read_data.py
635
636
637
638
639
640
641
642
643
644
645
646
def archive_former_tables(path_to_save: str | Path, table_name: str) -> None:
    """
    Move former tables to archive.

    :param path_to_save: Path where new table will be saved
    :param table_name: name of table
    """
    list_of_other_tables = Path(str(path_to_save)).parent.glob(f"*{table_name}.csv")
    list_of_other_tables = [p for p in list_of_other_tables if str(p) != path_to_save]

    for p in list_of_other_tables:
        p.rename(str(p).replace(paths.data.MAIN, paths.data.main.archive))

convert_date_time 🗿

convert_date_time(date_time: str) -> str

Convert date-time string to format YYYY-MM-DD HH:MM:SS:MS.

Source code in code/facesim3d/read_data.py
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def convert_date_time(date_time: str) -> str:
    """Convert date-time string to format `YYYY-MM-DD HH:MM:SS:MS`."""
    if pd.isna(date_time):
        return date_time
    if not date_time.startswith("2022-") and not date_time.startswith("2023-"):
        # Bring in format YYYY-MM-DD HH:MM:SS:MS (e.g., 2022-11-14 17:12:18:358)
        d = date_time[: date_time.find(":") - 2]
        t = date_time[date_time.find(":") - 2 :]
        d = d.replace(" ", "").replace(".", "-").replace("/", "-")  # remove blanks & replace
        date_time = f"{d} {t}"
    if date_time[10] != " ":
        # This solves an issue with dates like this '2022-12-1412-19-28- 880'
        date_time = date_time[:10] + " " + date_time[10:].replace("-", ":").replace(": ", ".")

    return date_time

convert_dynamodb_json_in_row_of_df 🗿

convert_dynamodb_json_in_row_of_df(
    df_row: Series,
) -> Series

Convert a row in a pandas Dataframe (df), which is in the 'DynamoDBjson`-format, into a normal df format.

Rows/cells come often in the following json format of DynamoDB: ' [{"N":"25"}]', or similar. Convert this (example) to: 25.

Source code in code/facesim3d/read_data.py
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def convert_dynamodb_json_in_row_of_df(df_row: pd.Series) -> pd.Series:
    """
    Convert a row in a pandas Dataframe (df), which is in the 'DynamoDB` `json`-format, into a normal df format.

    Rows/cells come often in the following `json` format of `DynamoDB`: `' [{"N":"25"}]'`, or similar.
    Convert this (example) to: `25`.
    """
    _row = df_row.copy()

    # Get the type and key of the type
    dtype_key = next(iter(literal_eval(_row[0])[0].keys()))
    row_dtype = DT_MAP[dtype_key]  # type mapper defined above

    # Convert row
    _row = _row.map(lambda x: literal_eval(x)[0][dtype_key])
    return _row.astype(row_dtype)

create_all_triple_combinations 🗿

create_all_triple_combinations(n_faces: int) -> DataFrame

Create all triplet combinations.

Source code in code/facesim3d/read_data.py
380
381
382
383
384
385
386
387
def create_all_triple_combinations(n_faces: int) -> pd.DataFrame:
    """Create all triplet combinations."""
    n_faces_in_triplet: int = 3
    if n_faces < n_faces_in_triplet:
        msg = f"Number of faces must be at least 3, but is {n_faces}!"
        raise ValueError(msg)
    triplet_combinations = list(combinations(range(1, n_faces + 1), r=n_faces_in_triplet))
    return pd.DataFrame(triplet_combinations, columns=["head1", "head2", "head3"])

delete_all_items_in_table_on_dynamodb 🗿

delete_all_items_in_table_on_dynamodb(
    table_name: str,
) -> None

Delete all items in table on DynamoDB (e.g., 'UXFData.FaceSim.TrialResults').

Source code in code/facesim3d/read_data.py
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
def delete_all_items_in_table_on_dynamodb(table_name: str) -> None:
    """Delete all items in table on `DynamoDB` (e.g., `'UXFData.FaceSim.TrialResults'`)."""
    delete = ask_true_false(
        f"\nAre you sure you downloaded and saved all data/items of table '{table_name}' from DynamoDB?", col="r"
    )

    if delete and ask_true_false(
        f"Are you sure you want to delete all items in table '{table_name}' on DynamoDB?", col="r"
    ):
        cprint(string=f"Scanning & deleting all items in '{table_name}' on DynamoDB ...", col="y")

        dynamodb = boto3.resource("dynamodb", region_name="eu-central-1")  # connect to DynamoDB
        table = dynamodb.Table(table_name)

        response = table.scan()
        data = response["Items"]
        # The following is necessary because the response is paginated (limit 1 MB)
        while "LastEvaluatedKey" in response:
            response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
            data.extend(response["Items"])

        key_schema = table.key_schema
        key_names = [k["AttributeName"] for k in key_schema]
        with table.batch_writer() as batch:
            for row in tqdm(data, desc=f"Deleting items in {table_name}"):
                batch.delete_item(
                    Key=dict(zip(key_names, [row[key] for key in key_names], strict=True))
                    # Key={"ppid_session_dataname": row["ppid_session_dataname"],
                    #      "SystemDateTime_BeginTrial": row["SystemDateTime_BeginTrial"]}
                )
        cprint(string=f"All items deleted from {table_name}.", col="g")

    else:
        cprint(string="Nothing will be  deleted.", col="g")

finalized_triplets 🗿

finalized_triplets(session: str) -> list[int]

Provide an overview of the finalized triplets.

For the given session, provide an overview of the finalized triplets. & return a list of remaining triplets.

Source code in code/facesim3d/read_data.py
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
def finalized_triplets(session: str) -> list[int]:
    """
    Provide an overview of the finalized triplets.

    For the given session, provide an overview of the finalized triplets. & return a list of remaining
    triplets.
    """
    n_all_triplets = np.math.comb(params.main.n_faces, 3)

    good_sess_tr_table = read_trial_results_of_session(session=session, clean_trials=True, verbose=False)

    sampled_unique_triplets = good_sess_tr_table.triplet_id.astype(int).unique()

    cprint(
        string=f"{len(sampled_unique_triplets) / n_all_triplets:.1%} of all triplets were sampled & approved "
        f"in session {session}.",
        col="g",
    )

    remaining_triplets = sorted(set(range(1, n_all_triplets + 1)) - set(sampled_unique_triplets))

    print(f"Number of remaining triplets: {len(remaining_triplets)}")

    return remaining_triplets

finalized_triplets_multi_sub_sample 🗿

finalized_triplets_multi_sub_sample() -> list[int]

Provide an overview of finalized triplets.

For the given session of the multi-sampled-sub-sample provide an overview & return the list of remaining triplets.

Source code in code/facesim3d/read_data.py
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
def finalized_triplets_multi_sub_sample() -> list[int]:
    """
    Provide an overview of finalized triplets.

    For the given session of the multi-sampled-sub-sample provide an overview & return the list of remaining triplets.
    """
    n_all_triplets = np.math.comb(params.multisubsample.n_faces, 3)

    # The following includes the trials written in 'UXFData.FaceSim.OtherTrialData' as well
    trial_results_table = load_table_from_dynamodb(table_name="UXFData.FaceSim.TrialResults", save=False, merge=False)

    good_sess_tr_table = remove_invalid_trials(trial_results_table=trial_results_table, verbose=True)

    # Filter for multi-sub-sample
    sampled_triplets_counts = good_sess_tr_table.triplet_id.astype(int).value_counts()

    print("\n", sampled_triplets_counts, "\n")

    for i in range(1, params.multisubsample.n_reps + 1):
        cprint(
            string=f"{(sampled_triplets_counts >= i).sum()}/{n_all_triplets} "
            f"({(sampled_triplets_counts >= i).sum() / n_all_triplets:.1%}) of all triplets were sampled at "
            f"least {i} times!",
            col="g",
        )
    # perc_sampled_n_times = len(sampled_triplets_counts[  # noqa: ERA001, RUF100
    #         sampled_triplets_counts == params.multisubsample.n_reps]) / n_all_triplets

    # cprint(f"{perc_sampled_n_times:.1%} of all triplets were sampled {params.multisubsample.n_reps} times!", "g")  # noqa: ERA001, E501

    remaining_triplets = sorted(
        set(range(1, n_all_triplets + 1))
        - set(sampled_triplets_counts[sampled_triplets_counts == params.multisubsample.n_reps].index)
    )

    print(f"Number of remaining triplet IDs: {len(remaining_triplets)}")

    # done_triplets = sorted(set(  # noqa: ERA001, RUF100
    #     sampled_triplets_counts[sampled_triplets_counts == params.multisubsample.n_reps].index))

    # print(f"Number of done triplets: {len(done_triplets)}")  # noqa: ERA001

    return remaining_triplets

get_current_state_of_triplets 🗿

get_current_state_of_triplets(
    session: str, pilot: bool = PILOT, plot: bool = False
) -> DataFrame

Get the current state of triplets (e.g., which triplet is currently in the experiment).

Source code in code/facesim3d/read_data.py
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
def get_current_state_of_triplets(session: str, pilot: bool = params.PILOT, plot: bool = False) -> pd.DataFrame:
    """Get the current state of triplets (e.g., which triplet is currently in the experiment)."""
    session = session.upper()
    if session not in params.SESSIONS:
        msg = f"Session '{session}' not in {params.SESSIONS}!"
        raise ValueError(msg)

    # Append table name
    table_name = "UXFData.FaceSim.TripletsIDB." + session
    if pilot:
        table_name += ".Pilot"

    # Load table from DynamoDB
    triplet_table = load_table_from_dynamodb(table_name=table_name, save=False, merge=False)

    # Get how many of each status
    n_complete = len(triplet_table[triplet_table.status == "G"])
    n_unseen = len(triplet_table[triplet_table.status == "U"])
    n_lock = len(triplet_table[triplet_table.status == "L"])
    n_total = len(triplet_table)  # == n_complete + n_lock + n_unseen == np.math.comb(n_faces, 3)

    # Print information
    cprint(string=f"Current state of {session} triplets:", fm="ul", ts=True)
    cprint(string=f"\t> {n_complete / n_total:.1%} triplets are completed", col="g")
    cprint(string=f"\t> {n_unseen / n_total:.1%} triplets are unseen", col="y")
    print(f"\t> {n_lock / n_total:.1%} triplets are locked")
    print(f"Data from {table_name} on DynamoDB")

    if plot:
        fig = plot_triplet_matrix(
            triplet_table=triplet_table[triplet_table.status == "G"],
            n_faces=params.pilot.v2.n_faces if pilot else params.main.n_faces,
        )

        fig.savefig(
            Path(paths.data.MAIN) / f"{datetime.now().date()}_sampled_{session}"
            f"{'-pilot' if pilot else ''}-triplets.png"
        )

    return triplet_table

get_list_of_acquired_sets 🗿

get_list_of_acquired_sets() -> list

Get the list of all sets that have been acquired.

Source code in code/facesim3d/read_data.py
414
415
416
417
418
def get_list_of_acquired_sets() -> list:
    """Get the list of all sets that have been acquired."""
    return sorted(
        [str(f).split("-Set")[1].split("_")[0] for f in Path(paths.data.main.prolific).glob("*Participants-Set*.csv")]
    )  # [2.0, 2.1, ..., 3.0, 3.1,  ...]

get_participant_session 🗿

get_participant_session(
    ppid: str, pilot: bool = PILOT
) -> str | None

Get session (2D, 3D) of the given participant ID (ppid).

Note

Participant can only be part of one session (2D or 3D).

Parameters:

Name Type Description Default
ppid str

ID of participant

required
pilot bool

True: use pilot data

PILOT

Returns:

Type Description
str | None

session of participant

Source code in code/facesim3d/read_data.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
def get_participant_session(ppid: str, pilot: bool = params.PILOT) -> str | None:
    """
    Get session (2D, 3D) of the given participant ID (`ppid`).

    !!! note
        Participant can only be part of one session (2D or 3D).

    :param ppid: ID of participant
    :param pilot: True: use pilot data
    :return: session of participant
    """
    # Get participant table
    pid_table = read_pilot_participant_data() if pilot else read_participant_data()
    session = pid_table[pid_table.ppid == ppid].group_exp
    session = session.drop_duplicates()
    if len(session) == 1:
        return session.item()  # extract session
    if len(session) > 1:
        msg = f"Participant '{ppid}' was part of different conditions!\n{session}\nThis must be solved manually!"
        raise ValueError(msg)
    cprint(string=f"Participant '{ppid}' not found!", col="r")
    return None

get_participant_set_numbers 🗿

get_participant_set_numbers(ppid: str) -> list[str]

Get the Set-number(s) of a given participant.

Note

Participants can be part of up to three sets, however, only of one session (2D or 3D).

Source code in code/facesim3d/read_data.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def get_participant_set_numbers(ppid: str) -> list[str]:
    """
    Get the Set-number(s) of a given participant.

    !!! note
        Participants can be part of up to three sets, however, only of one session (2D or 3D).

    """
    set_nrs = []
    for set_nr in get_list_of_acquired_sets():
        prolific_tab = read_prolific_participant_data(set_nr=set_nr)
        if ppid in prolific_tab["Participant id"].values:  # noqa: PD011
            set_nrs.append(set_nr)
    return set_nrs

get_triplet_ids_and_heads 🗿

get_triplet_ids_and_heads(pilot: bool = PILOT) -> DataFrame

Get all triplet_id's and heads corresponding to UXFData.FaceSim.TripletsIDB.*D[.Pilot].

Of the form:

triplet_id   triplet
         1  19_26_56
         2  22_68_73
         3  35_39_54
       ...       ...

Parameters:

Name Type Description Default
pilot bool

Whether to load pilot data or not (main).

PILOT

Returns:

Type Description
DataFrame

Table of the triplet_ids and heads [pd.DataFrame].

Source code in code/facesim3d/read_data.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def get_triplet_ids_and_heads(pilot: bool = params.PILOT) -> pd.DataFrame:
    """
    Get all `triplet_id`'s and heads corresponding to `UXFData.FaceSim.TripletsIDB.*D[.Pilot]`.

    Of the form:

        triplet_id   triplet
                 1  19_26_56
                 2  22_68_73
                 3  35_39_54
               ...       ...

    :param pilot: Whether to load pilot data or not (main).
    :return: Table of the triplet_ids and heads [`pd.DataFrame`].
    """
    p2_table = paths.data.pilot.triplets if pilot else paths.data.main.triplets
    return pd.read_csv(p2_table)

load_local_table 🗿

load_local_table(
    table_name: str | None = None,
) -> DataFrame | None

Load a UXFData.FaceSim.* table from the local storage system.

Source code in code/facesim3d/read_data.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def load_local_table(table_name: str | None = None) -> pd.DataFrame | None:
    """Load a `UXFData.FaceSim.*` table from the local storage system."""
    if table_name is None:
        cprint(string="Specify the table to load:", col="y", fm="ul")
        p2_table = browse_files(initialdir=paths.data.MAIN, filetypes="*.csv")
    else:
        p2_table = list(Path(paths.data.MAIN).glob(f"*{table_name}*"))
        if len(p2_table) > 1:
            cprint(
                string=f"Found more than one file w.r.t. table '{table_name}'!\n"
                f"Choose the corresponding table file by index:",
                col="b",
            )
            print("", *[f"{i}:\t'{tab.name}'" for i, tab in enumerate(p2_table)], sep="\n\t")
            tab_idx = cinput(string="\nType index of table you want to load: ", col="y")
            p2_table = p2_table[int(tab_idx)]
        elif len(p2_table) == 0:
            cprint(string=f"No table found w.r.t. '{table_name}'!", col="y")
            return None
        else:
            p2_table = p2_table.pop()

    # Load tablet
    tab = pd.read_csv(p2_table)

    # Convert table rows if necessary: unpack the DynamoDB json format
    for col in tab.columns:
        if isinstance(tab[col][0], str) and tab[col][0].startswith("[{"):
            tab[col] = tab[col].map(literal_eval)

            if isinstance(tab[col][0], list) and len(tab[col][0]) == 1:
                tab[col] = tab[col].map(lambda x: x[0])
                tab[col] = convert_dynamodb_json_in_row_of_df(df_row=tab[col])
            elif isinstance(tab[col][0], list) and len(tab[col][0]) > 1:
                # Primarily a case for 'UXFData.FaceSim.SessionLog'.
                # Most cells in row (i.e., one participant session)
                # are lists of DynamoDB jsons (i.e., dicts)
                # convert [{'S': 'Log'}, {'S': 'Log'}, , ...] -> [Log, Log, ...] per row
                tab[col] = tab[col].map(lambda x: [cell[next(iter(cell.keys()))] for cell in x])
            else:
                cprint(string=f"Unknown format of column '{col}' at index 0!", col="r")

        if isinstance(tab[col][0], str) and tab[col][0].startswith("["):
            # Primarily a case for 'UXFData.FaceSim.SessionLog' after formatting
            tab[col] = tab[col].map(literal_eval)

        if "SystemDateTime" in col:
            tab[col] = tab[col].map(convert_date_time)

    return tab

load_table_from_dynamodb 🗿

load_table_from_dynamodb(
    table_name: str | None = None,
    save: bool = False,
    merge: bool = False,
) -> DataFrame

Load a UXFData.FaceSim.* table from DynamoDB.

Source code in code/facesim3d/read_data.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
def load_table_from_dynamodb(table_name: str | None = None, save: bool = False, merge: bool = False) -> pd.DataFrame:
    """Load a `UXFData.FaceSim.*` table from DynamoDB."""
    dynamodb = boto3.resource("dynamodb", region_name="eu-central-1")  # connect to DynamoDB

    table_list = list(dynamodb.tables.all())  # pull all tables (names) from DynamoDB
    table_list = [t.name for t in table_list]  # extract table names

    if table_name is None:
        cprint(string="Specify table to download:", col="y", fm="ul")
        print("", *[f"{i}:\t'{tab.split('.FaceSim.')[-1]}'" for i, tab in enumerate(table_list)], sep="\n\t")
        tab_idx = cinput(string="\nType index of table you want to download: ", col="y")
        table_name = table_list[int(tab_idx)]
    elif table_name not in table_list:
        msg = (
            f"Given table '{table_name}' was not found on DynamoDB!\n"
            f"\nFollowing tables are available:\n\n{table_list!s}"
        )
        raise ValueError(msg)

    cprint(string=f"Scanning & loading table '{table_name}' from DynamoDB ...", col="b")

    table = dynamodb.Table(table_name)

    response = table.scan()  # -> dict
    data = response["Items"]
    # The following is necessary because the response is paginated (limit 1 MB)
    while "LastEvaluatedKey" in response:
        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        data.extend(response["Items"])

    # Convert to pandas DataFrame
    loaded_df = pd.DataFrame(data)

    # Unbox table cells
    # Some tables have list entries in cells, with list-length==0 (see below): Unpack them:
    #     screen_width      SystemDateTime_StartExp  ... screen_height age_years
    # 0         [1600]    [2022/11/14 15:50:05.086]  ...         [900]      [25]
    # 1         [1920]    [2022/11/14 14:51:55.245]  ...        [1080]      [21]
    # 2         [1440]    [2022/11/14 15:49:49.150]  ...         [900]      [22]

    # Filter out triplet unlocker
    ppid_sess_col = [c for c in loaded_df.columns if c.startswith("ppid_session")]
    for i, v in loaded_df[ppid_sess_col].iterrows():  # noqa: B007
        if isinstance(v.values[0], str) and v.values[0].startswith("UnlockTriplets_"):  # noqa: PD011
            break
    else:
        i = None
    if i is not None:
        loaded_df = loaded_df.drop(index=i).reset_index(drop=True)

    # Process columns
    for col in loaded_df.columns:
        rnd_idx = np.random.randint(0, len(loaded_df), 10)
        if all(
            (isinstance(cell, list) and len(cell) == 1)
            for cell in loaded_df[col].iloc[rnd_idx].values  # noqa: PD011
        ):  # noqa: PD011, RUF100
            loaded_df[col] = loaded_df[col].map(lambda x: x[0])
        # Do not unpack those with list length > 1

        # Correct datetime in table
        if "SystemDateTime" in col:
            loaded_df[col] = loaded_df[col].map(convert_date_time)

    # Extract ppid from ppid_session_dataname
    if "ppid_session_dataname" in loaded_df.columns and "ppid" not in loaded_df.columns:
        loaded_df["ppid"] = loaded_df.ppid_session_dataname.map(lambda x: x.split("_s0")[0])

    # Merge with existing tables
    log_or_trial = (
        "UXFData.FaceSim.TrialResults" in table_name
        or "UXFData.FaceSim.OtherTrialData" in table_name
        or "UXFData.FaceSim.SessionLog" in table_name
    )
    merge = merge and not log_or_trial
    # *.TrialResults & *.SessionLog tables are relatively big and should not be merged
    df_split = None  # init / this is due to the updated protocol, how data is written from May 2023
    table_name_split = None  # init
    if merge:
        if table_name == "UXFData.FaceSim.OtherSessionData":
            table_name_split = table_name.replace("OtherSessionData", "ParticipantDetails")
            df_split = loaded_df[loaded_df.ppid_session_dataname.str.contains("_participant_details")].reset_index(
                drop=True
            )
            loaded_df = loaded_df[~loaded_df.ppid_session_dataname.str.contains("_participant_details")].reset_index(
                drop=True
            )
            # Remove empty columns in dfs
            loaded_df = loaded_df.dropna(axis=1, how="all")
            df_split = df_split.dropna(axis=1, how="all")

            df_split, _ = merge_tables(df=df_split, table_name=table_name_split)

        loaded_df, merge = merge_tables(df=loaded_df, table_name=table_name)

    if table_name == "UXFData.FaceSim.TrialResults":
        # Merge 'TrialResults' with 'OtherTrialData' table
        table_name_other = table_name.replace("TrialResults", "OtherTrialData")
        df_other = load_table_from_dynamodb(table_name=table_name_other, merge=False, save=False)
        if set(loaded_df.columns) != set(df_other.columns):
            msg = "No column match of 'TrialResults' & 'OtherTrialData' tables!"
            raise ValueError(msg)
        loaded_df = pd.concat([df_other, loaded_df], ignore_index=True)

        if loaded_df.duplicated().any():
            cprint(f"Dropping {loaded_df.duplicated().sum()} duplicates ...", col="b")
            loaded_df = loaded_df.drop_duplicates(ignore_index=True)

        # Remove column 'trial_results_location_0'
        loaded_df = loaded_df.drop(columns=["trial_results_location_0"])

    if save:
        # Save table
        date_tag = str(datetime.today().date())
        path_to_save = Path(paths.data.MAIN, f"{date_tag}_{table_name}.csv")

        i = 0
        while log_or_trial and path_to_save.exists():
            # Do not overwrite *.TrialResults & *.SessionLog tables
            path_to_save = Path(str(path_to_save.replace(date_tag, f"{date_tag}{ascii_lowercase[i]}")))
            i += 1

        loaded_df.to_csv(path_to_save, index=False)
        path_to_save_split = None  # init
        if df_split is not None:
            path_to_save_split = Path(str(path_to_save.replace(table_name, table_name_split)))
            df_split.to_csv(path_to_save_split, index=False)

        if merge:
            # Move former tables to archive
            archive_former_tables(path_to_save=path_to_save, table_name=table_name)
            if path_to_save_split is not None:
                archive_former_tables(path_to_save=path_to_save_split, table_name=table_name_split)

    return loaded_df

load_trial_results_from_dynamodb 🗿

load_trial_results_from_dynamodb(
    bucket_name: str | None = "facesimdb",
    via_s3: bool = False,
    save: bool = True,
    verbose: bool = True,
) -> DataFrame

Load all trial-results from DynamoDB.

Blog posts on getting data from DynamoDB with Python and Boto3

https://www.fernandomc.com/posts/ten-examples-of-getting-data-from-dynamodb-with-python-and-boto3/ https://dashbird.io/blog/aws-s3-python-tricks/

Also, check out this Stack Overflow post:

https://stackoverflow.com/questions/10450962/how-can-i-fetch-all-items-from-a-dynamodb-table-without-specifying-the-primary-k

Returns:

Type Description
DataFrame

Pandas DataFrame with all trial results

Source code in code/facesim3d/read_data.py
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
def load_trial_results_from_dynamodb(
    bucket_name: str | None = "facesimdb",
    via_s3: bool = False,
    save: bool = True,
    verbose: bool = True,
) -> pd.DataFrame:
    """
    Load all trial-results from `DynamoDB`.

    ??? tip "Blog posts on getting data from DynamoDB with Python and Boto3"

        https://www.fernandomc.com/posts/ten-examples-of-getting-data-from-dynamodb-with-python-and-boto3/
        https://dashbird.io/blog/aws-s3-python-tricks/

        Also, check out this `Stack Overflow` post:

        https://stackoverflow.com/questions/10450962/how-can-i-fetch-all-items-from-a-dynamodb-table-without-specifying-the-primary-k

    :return: Pandas DataFrame with all trial results
    """
    if not via_s3:
        # Get all trial results directly from DynamoDB
        return load_table_from_dynamodb(table_name="UXFData.FaceSim.TrialResults", save=save)

    # Download trial results from S3 (after export of table to S3)
    s3 = boto3.client("s3")

    if bucket_name is None:
        bucket_list = s3.list_buckets()["Buckets"]

        if len(bucket_list) > 1:
            # TODO: implement choice of bucket  # noqa: FIX002
            bucket_name = None
            msg = "More than one bucket found. Implement selection functionality!"
            raise NotImplementedError(msg)
        else:  # noqa: RET506
            bucket_name = bucket_list[0]["Name"]  # == "facesimdb"

    available_files = s3.list_objects(Bucket=bucket_name)

    available_files = [
        f for f in available_files["Contents"] if (f["Key"].endswith(".json.gz") and "AWSDynamoDB/" in f["Key"])
    ]

    if verbose:
        cprint(string=f"Found following files to download in S3 bucket '{bucket_name}':", fm="ul")
        print("", *[f"{f['LastModified']!s} : {f['Key']}" for f in available_files], sep="\n\t> ")

    # Check for different download folders
    data_folder = {d["Key"].split("/data/")[0].split("AWSDynamoDB/")[-1] for d in available_files}

    if len(data_folder) > 1:
        # In case there are multiple folders to download from, choose a folder
        data_folder = list(data_folder)
        cprint(string="Specify folder to download from:", col="y", fm="ul")
        print("", *[f"{i}:\t'{d}'" for i, d in enumerate(data_folder)], sep="\n\t")
        f_idx = cinput(string="\nType index of folder you want to download from: ", col="y")
        data_folder = data_folder[int(f_idx)]

    else:
        data_folder = data_folder.pop()

    available_files = [f for f in available_files if data_folder in f["Key"]]  # filter for folder

    for s3_file in available_files:
        # Download file
        file_date = str(s3_file["LastModified"].date())
        p2_store = Path(paths.data.main.s3, "TrialResults", file_date, s3_file["Key"].split("/")[-1])
        if p2_store.is_file():
            cprint(string=f"File '{p2_store}' already exists. Skipping download.", col="g")
            continue
        p2_store.parent.mkdir(parents=True, exist_ok=True)

        s3.download_file(Bucket=bucket_name, Key=s3_file["Key"], Filename=p2_store)
        print("Downloaded file to:", p2_store)

    cprint(string="Running now the following function: read_and_convert_s3_results_json_data() ...", col="y")
    return read_and_convert_s3_results_json_data(verbose=verbose)

main 🗿

main() -> None

Run the main function of read_data.py.

Source code in code/facesim3d/read_data.py
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
def main() -> None:
    """Run the main function of `read_data.py`."""
    table_list = [
        "UXFData.FaceSim." + name
        for name in [
            "OtherSessionData",
            "ParticipantDetails",
            "Settings",
            "SessionLog",
            "TrialResults",
            "OtherTrialData",
        ]
    ]  # "OtherTrialData" must be last & after "TrialResults"

    if FLAGS.mss:
        # Set all other boolean FLAGS to False
        for flag in FLAGS.__dict__:
            if flag not in {"mss", "set_nr", "triplets"}:
                setattr(FLAGS, flag, False)
            # FLAGS.load = FLAGS.delete FLAGS.plot = FLAGS.pilot = FLAGS.verbose = False

        update_triplet_table_on_dynamodb_multi_sub_sample(
            session=f"{FLAGS.set_nr[0]}D", set_finalised_triplets_to_g=True
        )

    if FLAGS.load:
        for table_name in table_list[:-1]:  # "*.OtherTrialData" will be merged with "*.TrialResults"
            tab = load_table_from_dynamodb(table_name=table_name, save=True, merge=True)
            if FLAGS.verbose:
                print(tab)

    # Delete all items in all tables (but *TripletsIDB.*D)
    if FLAGS.delete:
        for table_name in table_list:
            delete_all_items_in_table_on_dynamodb(table_name=table_name)

    if FLAGS.verbose:
        cprint(string=f"\nLoad data of Set{FLAGS.set_nr} ...", col="b")
        tab = read_trial_results_of_set(set_nr=FLAGS.set_nr, clean_trials=False, verbose=True)
        print(tab)

    if FLAGS.triplets:
        get_current_state_of_triplets(session=FLAGS.triplets, pilot=FLAGS.pilot, plot=FLAGS.plot)

merge_tables 🗿

merge_tables(
    df: DataFrame, table_name: str
) -> tuple[DataFrame, bool]

Merge a given table (df) with an existing table of the given table name.

Source code in code/facesim3d/read_data.py
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def merge_tables(df: pd.DataFrame, table_name: str) -> tuple[pd.DataFrame, bool]:
    """Merge a given table (`df`) with an existing table of the given table name."""
    cprint(string="Merging tables ...", col="b")
    merge = True  # init
    df2 = load_local_table(table_name=table_name)
    if df2 is None:
        merge = False
        merged_df = df
    else:
        if set(df.columns) != set(df2.columns):
            msg = "No column match of downloaded & local tables!"
            raise ValueError(msg)
        merged_df = pd.concat([df2, df], ignore_index=True)

        if merged_df.duplicated().any():
            cprint(string=f"Dropping {merged_df.duplicated().sum()} duplicates ...", col="b")
            merged_df = merged_df.drop_duplicates(ignore_index=True)

    return merged_df, merge

plot_triplet_matrix 🗿

plot_triplet_matrix(
    triplet_table: DataFrame, n_faces: int
) -> Figure

Plot matrix of triplets.

Source code in code/facesim3d/read_data.py
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
def plot_triplet_matrix(triplet_table: pd.DataFrame, n_faces: int) -> plt.Figure:
    """Plot matrix of triplets."""
    triplet_table["triplet"] = triplet_table.triplet.map(lambda x: x.split("_"))
    sampling_mat = np.zeros((n_faces, n_faces))
    for _i, triplet_row in tqdm(iterable=triplet_table.iterrows(), desc="Fill count matrix of triplets"):
        triplet = [int(f_id) for f_id in triplet_row.triplet]
        for comb in combinations(triplet, r=2):
            sampling_mat[comb[0] - 1, comb[1] - 1] += 1
            sampling_mat[comb[1] - 1, comb[0] - 1] += 1

    sampling_mat /= n_faces - 2
    np.fill_diagonal(sampling_mat, np.nan)

    # Plot the sampling matrix
    fig, ax = plt.subplots(num=f"{datetime.now().replace(microsecond=0)} | Sampled triplets", figsize=(10, 8))
    h = sns.heatmap(sampling_mat, cmap="YlOrBr", vmin=0, vmax=1, ax=ax)
    h.set(
        title=f"{datetime.now().replace(microsecond=0)} | "
        f"{len(triplet_table) / np.math.comb(n_faces, 3):.1%} Sampled triplets | "
        f"{np.nanmin(sampling_mat):.1%}-{np.nanmax(sampling_mat):.1%} (min-max)"
    )
    fig.tight_layout()
    plt.show()
    return fig

read_and_convert_s3_results_json_data 🗿

read_and_convert_s3_results_json_data(
    verbose: bool = False,
) -> DataFrame

Get the full trial table of the main study from memory.

This table must be constructed from several json tables downloaded from S3.

Parameters:

Name Type Description Default
verbose bool

Be verbose or not

False

Returns:

Type Description
DataFrame

Processed trial table

Source code in code/facesim3d/read_data.py
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
def read_and_convert_s3_results_json_data(verbose: bool = False) -> pd.DataFrame:
    """
    Get the full trial table of the main study from memory.

    This table must be constructed from several `json` tables downloaded from `S3`.

    :param verbose: Be verbose or not
    :return: Processed trial table
    """
    # Define the session path
    p2_s3 = Path(paths.data.main.s3, "TrialResults")

    if verbose:
        tree(p2_s3)

    # Find the correct directory
    trial_result_dirs = [d for d in os.listdir(p2_s3) if (d.startswith("2022-") and not d.endswith(".csv"))]

    if len(trial_result_dirs) > 1:
        cprint(string="Choose random *.json* file from folder which should be unpacked:", col="y")
        trial_result_dirs = Path(browse_files(initialdir=p2_s3)).parent  # , filetypes=".json"))
    else:
        trial_result_dirs = trial_result_dirs.pop()
    trial_result_dirs = Path(p2_s3, trial_result_dirs)

    # Set path to processed trial table
    trial_result_full_table_path = Path(paths.data.MAIN, f"{trial_result_dirs.name}_UXFData.FaceSim.TrialResults.csv")

    # Check if the full (concatenated) table is already there
    convert_json_to_csv = True
    append_table = False
    table_processed = None  # one table to save all data
    if trial_result_full_table_path.is_file():
        table_processed = pd.read_csv(trial_result_full_table_path)
        cprint(string=f"\nTable with trial results already exists: {trial_result_full_table_path}", col="g")
        append_table = convert_json_to_csv = ask_true_false(
            question="Do you want to append new data to the existing table?", col="y"
        )

    if convert_json_to_csv:
        # Read json files
        trial_result_files = [f for f in os.listdir(trial_result_dirs) if (f.endswith((".json", ".json.gz")))]

        type_dict = None  # init DynamoDB type dict
        for json_file_path in tqdm(trial_result_files, desc="Read json files", position=0):
            p2_json = Path(trial_result_dirs, json_file_path)

            # Process json file via pandas
            table_raw = pd.read_json(p2_json, lines=True)

            if type_dict is None:
                td = [(col, next(iter(table_raw.iloc[0]["Item"][col].keys()))) for col in table_raw.iloc[0]["Item"]]

                td = pd.DataFrame(td, columns=["col_name", "type"])
                type_dict = dict(zip(td["col_name"], td["type"].map(DT_MAP), strict=True))

            # Remove type info from json file
            for row in tqdm(table_raw.values, desc=f"Read rows of '{json_file_path}'", position=1):
                current_row = row.item()

                if append_table:
                    ppid = current_row["ppid"]["S"]
                    sys_t = convert_date_time(date_time=current_row["SystemDateTime_BeginTrial"]["S"])
                    if sys_t in table_processed.loc[table_processed.ppid == ppid].SystemDateTime_BeginTrial.to_list():
                        continue

                trial_tab = pd.DataFrame(current_row)
                # append empty row to table
                copy_row = trial_tab.iloc[0:1].copy()
                copy_row[~pd.isna(copy_row)] = np.nan
                trial_tab = pd.concat([trial_tab, copy_row])

                # Write non-nan-value in empty row for each column
                for col in trial_tab.columns:
                    trial_tab.iloc[-1][col] = trial_tab[col].dropna().item()

                # Keep only filled row
                trial_tab = trial_tab.iloc[-1:]

                # Exclude empty rows
                if "triplet" not in trial_tab.columns:
                    continue
                if (
                    (trial_tab.head1 == trial_tab.head2).all()
                    and (trial_tab.head2 == trial_tab.head3).all()
                    and not trial_tab.head2.item()
                ) or (not trial_tab.triplet.item()):
                    # These rows are empty after the experiment was stopped early, usually after 3 missed
                    # catch trials
                    continue

                # Concatenate to big table
                table_processed = trial_tab if table_processed is None else pd.concat([table_processed, trial_tab])
                # , ignore_index=True)

        # Fill empty slots with nan
        table_processed = table_processed.replace("", np.nan)

        # Adapt dtypes
        table_processed = table_processed.astype(type_dict)

        # Remove unnecessary columns & sort the rest
        table_processed = table_processed[SORTED_COLS]

        # Solve date issue
        table_processed.SystemDateTime_BeginTrial = table_processed.SystemDateTime_BeginTrial.map(convert_date_time)

        # Sort rows table by ppid and start time/date of trial
        table_processed = table_processed.sort_values(
            by=["SystemDateTime_BeginTrial", "ppid"], axis=0, ascending=True
        ).reset_index(drop=True)

        # Save table
        table_processed.to_csv(trial_result_full_table_path, index=False)

    return table_processed

read_logs_of_set 🗿

read_logs_of_set(set_nr: str) -> DataFrame

Read all log tables of a given Set number.

Source code in code/facesim3d/read_data.py
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
def read_logs_of_set(set_nr: str) -> pd.DataFrame:
    """Read all log tables of a given Set number."""
    where_tab = where_to_find_trial_and_log_data(set_nr=set_nr, update_table=False)
    log_table = None  # init
    for p2_log_table in where_tab[where_tab.type == "SessionLog"].table_name:
        print("Load:", p2_log_table)
        if log_table is None:
            log_table = load_local_table(table_name=p2_log_table)

        else:
            print("Append:", p2_log_table)
            log_table = log_table.append(load_local_table(table_name=p2_log_table))

    # Remove participants from table which are not part of given Set (set_nr)
    prolific_set_ppids = read_prolific_participant_data(set_nr=set_nr)["Participant id"]
    if not (log_table.ppid.isin(prolific_set_ppids)).all():
        cprint(
            string=f"{len(log_table[~log_table.ppid.isin(prolific_set_ppids)])} participant(s) are in the "
            f"log table, but are not part of Set{set_nr}! They will be dropped ...",
            col="y",
        )
        log_table = log_table[log_table.ppid.isin(prolific_set_ppids)]  # drop (out-of-set_nr) ppids
        log_table = log_table.reset_index(drop=True)

    return log_table

read_participant_data 🗿

read_participant_data(process: bool = False) -> DataFrame

Get the full participant table of the main study.

Table name: '*_UXFData.FaceSim.ParticipantDetails_processed.csv'.

Parameters:

Name Type Description Default
process bool

True: force (re-)processing of data

False

Returns:

Type Description
DataFrame

participant table

Source code in code/facesim3d/read_data.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def read_participant_data(process: bool = False) -> pd.DataFrame:
    """
    Get the full participant table of the main study.

    Table name: `'*_UXFData.FaceSim.ParticipantDetails_processed.csv'`.

    :param process: True: force (re-)processing of data
    :return: participant table
    """
    # Load full table
    p2_participant_files = list(Path(paths.data.MAIN).glob("*UXFData.FaceSim.ParticipantDetails.csv"))

    if len(p2_participant_files) > 1:
        p2_participant_files = Path(browse_files(initialdir=paths.data.MAIN, filetypes="*.csv"))
    else:
        p2_participant_files = p2_participant_files.pop()

    p2_raw_participant_files = Path(
        str(p2_participant_files).replace(paths.data.MAIN, paths.data.main.archive).replace(".csv", "_raw.csv")
    )

    # Check if the full (concatenated) table is already there
    if p2_raw_participant_files.exists() and not process:
        # We know that p2_participant_files.is_file() is True
        table_processed = pd.read_csv(p2_participant_files)

    else:  # process == True:
        table_processed = load_local_table(table_name=p2_participant_files.name)

        # Extract from ppid_session_dataname the ppid
        if "ppid" not in table_processed.columns:
            table_processed["ppid"] = table_processed.ppid_session_dataname.map(lambda x: x.split("_s0")[0])

        # Remove debug & UnlockTriplets (has NaN's in 'group_exp' column) users
        table_processed = table_processed.loc[
            table_processed.ppid.isin([p for p in table_processed.ppid if "debug" not in p])
        ]
        table_processed = table_processed[table_processed.ppid != "UnlockTriplets"].reset_index(drop=True)

        # Archive unprocessed table
        p2_participant_files.rename(p2_raw_participant_files)

        # Save processed table
        table_processed.to_csv(p2_participant_files, index=False)

    return table_processed

read_pilot_data 🗿

read_pilot_data(
    clean_trials: bool = False, verbose: bool = False
) -> DataFrame

Get the full trial table of the pilot study (version 2).

This table is downloaded as csv in one sweep from DynamoDB.

Parameters:

Name Type Description Default
clean_trials bool

clean trials (remove trials with no response, etc.)

False
verbose bool

be verbose or not

False

Returns:

Type Description
DataFrame

processed trial table

Source code in code/facesim3d/read_data.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def read_pilot_data(clean_trials: bool = False, verbose: bool = False) -> pd.DataFrame:
    """
    Get the full trial table of the pilot study (version 2).

    This table is downloaded as `csv` in one sweep from `DynamoDB`.

    :param clean_trials: clean trials (remove trials with no response, etc.)
    :param verbose: be verbose or not
    :return: processed trial table
    """
    # Check data dir
    if verbose:
        tree(paths.data.pilot.v2)

    # Load full table
    trial_result_files = list(Path(paths.data.pilot.v2).glob("*UXFData.FaceSim.TrialResults.csv"))
    if len(trial_result_files) > 1:
        trial_result_files = browse_files(initialdir=paths.data.pilot.v2, filetypes="*.csv")
    else:
        trial_result_files = trial_result_files.pop()

    # Check if the full (concatenated) table is already there
    trial_result_full_table_path = str(trial_result_files).replace("TrialResults", "TrialResults_processed")
    if Path(trial_result_full_table_path).is_file():
        table_processed = pd.read_csv(trial_result_full_table_path)

    else:
        table_processed = pd.read_csv(trial_result_files)

    # Remove unnecessary columns & sort the rest
    table_processed = table_processed[SORTED_COLS_PILOT]

    # Sort rows
    table_processed = table_processed.sort_values(
        by=["ppid", "trial_num"], axis=0, ascending=True, inplace=False
    ).reset_index(drop=True)

    # Save table
    table_processed.to_csv(trial_result_full_table_path, index=False)

    if clean_trials:
        cath_head_trials = 0.0
        table_processed = table_processed[table_processed.block_num > 1]  # remove training
        table_processed = table_processed[table_processed.catch_head == cath_head_trials]  # remove catch trials
        table_processed = table_processed[table_processed.head_odd != 0]  # remove time-outs
        table_processed = table_processed[~table_processed.ppid.isin(dropouts_pilot_v2)]

    return table_processed.reset_index(drop=True)

read_pilot_participant_data 🗿

read_pilot_participant_data() -> DataFrame

Get the full participant table of the pilot study (version 2).

Returns:

Type Description
DataFrame

participant table

Source code in code/facesim3d/read_data.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def read_pilot_participant_data() -> pd.DataFrame:
    """
    Get the full participant table of the pilot study (version 2).

    :return: participant table
    """
    # Load full table
    participant_files = list(Path(paths.data.pilot.v2).glob("*UXFData.FaceSim.ParticipantDetails.csv"))
    if len(participant_files) > 1:
        participant_files = browse_files(initialdir=paths.data.pilot.v2, filetypes="*.csv")
    else:
        participant_files = participant_files.pop()

    # Check if the full (concatenated) table is already there
    participant_full_table_path = Path(
        str(participant_files).replace("ParticipantDetails", "ParticipantDetails_processed")
    )
    if participant_full_table_path.is_file():
        table_processed = pd.read_csv(participant_full_table_path, index_col=0)

    else:
        table_processed = pd.read_csv(participant_files)

        # Exchange ppid_session_dataname with ppid
        table_processed = table_processed.rename(columns={"ppid_session_dataname": "ppid"})
        table_processed.ppid = table_processed.ppid.replace("_s001_participant_details", "", regex=True)

        # Remove UnlockTriplets user (has NaN's in 'group_exp' column)
        table_processed = table_processed[table_processed.ppid != "UnlockTriplets"].reset_index(drop=True)
        # table_processed.drop(index=table_processed[table_processed.group_exp.isna()].index, axis=1,
        #                      inplace=True)  # This should be the UnlockTriplets 'User'

        # Clean up columns
        dtype_map = {"N": np.int64, "S": str}  # "D": np.datetime64
        for col in table_processed.columns:
            if col == "ppid":
                continue

            # Get dtype
            cell = table_processed[col].iloc[0]
            dt = cell[cell.find(":") - 2 : cell.find(":") - 1]
            # Update cells in column
            table_processed[col] = (
                table_processed[col]
                .map(lambda x: x.replace('[{"' + f'{dt}":"', ""))  # noqa: B023
                .replace('"}]', "", regex=True)
                .astype(dtype_map[dt])
            )

        # Save processed table
        table_processed.to_csv(participant_full_table_path)

    return table_processed

read_prolific_participant_data 🗿

read_prolific_participant_data(
    set_nr: str | float, return_path: bool = False
) -> DataFrame | tuple[DataFrame, str]

Read the participant table of a given Set downloaded from Prolific.

Parameters:

Name Type Description Default
set_nr str | float

Prolific Set number 2. for 2D AND 3. for 3D

required
return_path bool

if True also return the path to the file

False

Returns:

Type Description
DataFrame | tuple[DataFrame, str]

participant table of the given Prolific Set

Source code in code/facesim3d/read_data.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
def read_prolific_participant_data(
    set_nr: str | float, return_path: bool = False
) -> pd.DataFrame | tuple[pd.DataFrame, str]:
    """
    Read the participant table of a given Set downloaded from Prolific.

    :param set_nr: Prolific Set number 2.* for 2D AND 3.* for 3D
    :param return_path: if True also return the path to the file
    :returns: participant table of the given Prolific Set
    """
    p_files = [
        f for f in os.listdir(paths.data.main.prolific) if f.endswith(".csv") and f"Participants-Set{set_nr}_" in f
    ]
    if not p_files:  # empty
        cprint(string=f"No participant table found for Set{set_nr}!", col="r")
        return None
    if len(p_files) > 1:
        cprint(string=f"Choose the corresponding Participant file of Set{set_nr}!", col="b")
        cprint(string="Note: There should be only one file per Set!", col="y")
        p_files = browse_files(initialdir=paths.data.main.prolific, filetypes="*.csv")
    else:
        p_files = p_files.pop()

    # Read table
    full_path = Path(paths.data.main.prolific, p_files)
    ppid_prolific_table = pd.read_csv(full_path)

    # Add decision column (if not there)
    if "decision" not in ppid_prolific_table.columns:
        ppid_prolific_table["decision"] = np.nan

    if return_path:
        return ppid_prolific_table, str(full_path)
    return ppid_prolific_table

read_trial_results_of_participant 🗿

read_trial_results_of_participant(
    ppid: str,
    clean_trials: bool = False,
    verbose: bool = True,
) -> DataFrame

Read all trial results of a given participant.

Source code in code/facesim3d/read_data.py
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
def read_trial_results_of_participant(ppid: str, clean_trials: bool = False, verbose: bool = True) -> pd.DataFrame:
    """Read all trial results of a given participant."""
    set_nrs = get_participant_set_numbers(ppid=ppid)
    tr_table = None  # init
    for set_nr in set_nrs:
        if tr_table is None:
            tr_table = read_trial_results_of_set(set_nr=set_nr, clean_trials=clean_trials, verbose=verbose)
        else:
            tr_table = pd.concat(
                objs=[tr_table, read_trial_results_of_set(set_nr=set_nr, clean_trials=clean_trials, verbose=verbose)]
            )

    return tr_table[tr_table.ppid == ppid].reset_index(drop=True)

read_trial_results_of_session cached 🗿

read_trial_results_of_session(
    session: str,
    clean_trials: bool = False,
    drop_subsamples: bool = True,
    verbose: bool = True,
) -> DataFrame

Read all trial results of a given session.

Source code in code/facesim3d/read_data.py
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
@lru_cache(maxsize=24)
def read_trial_results_of_session(
    session: str, clean_trials: bool = False, drop_subsamples: bool = True, verbose: bool = True
) -> pd.DataFrame:
    """Read all trial results of a given session."""
    if session.upper() not in params.SESSIONS:
        msg = f"Session '{session}' not in {params.SESSIONS}!"
        raise ValueError(msg)

    set_nrs_of_session = [s for s in get_list_of_acquired_sets() if s.split(".")[0] == session[0]]
    set_nr_sub_sample = f"{session[0]}.20"
    if drop_subsamples:
        set_nrs_of_session.remove(set_nr_sub_sample)
    else:
        cprint(string=f"Multi-subsample Set-{set_nr_sub_sample} is included in the returned table!", col="r")

    tr_table = None  # init
    for set_nr in set_nrs_of_session:
        if tr_table is None:
            tr_table = read_trial_results_of_set(set_nr=set_nr, clean_trials=clean_trials, verbose=verbose)
        else:
            tr_table = pd.concat(
                [tr_table, read_trial_results_of_set(set_nr=set_nr, clean_trials=clean_trials, verbose=verbose)],
                ignore_index=True,
            )

    return tr_table

read_trial_results_of_set 🗿

read_trial_results_of_set(
    set_nr: str,
    clean_trials: bool = True,
    verbose: bool = True,
) -> DataFrame

Read all trial results of a given Set number.

Source code in code/facesim3d/read_data.py
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
def read_trial_results_of_set(set_nr: str, clean_trials: bool = True, verbose: bool = True) -> pd.DataFrame:
    """Read all trial results of a given Set number."""
    where_tab = where_to_find_trial_and_log_data(set_nr=set_nr, update_table=False)
    tr_table = None  # init
    for p2_tr_table in where_tab[where_tab.type == "TrialResults"].table_name:
        if verbose:
            print("Load:", p2_tr_table)
        if tr_table is None:
            tr_table = _read_trial_results(process=False, date=p2_tr_table.split("_")[0])

        else:
            if verbose:
                print("Append:", p2_tr_table)
            tr_table = tr_table.append(
                _read_trial_results(process=False, date=p2_tr_table.split("_")[0]), ignore_index=True
            )

    # Remove participants from table which are not part of given Set (set_nr)
    prolific_set_ppids = read_prolific_participant_data(set_nr=set_nr)["Participant id"]

    th_multi_sub_sample: int = 20
    if not set(tr_table.ppid).issubset(set(prolific_set_ppids)) and int(set_nr.split(".")[-1]) < th_multi_sub_sample:
        # Ignore Sets of multi-sub-sample
        if verbose:
            cprint(
                string=f"{len(set(tr_table.ppid) - set(prolific_set_ppids))} participant(s) are in the "
                f"trial results table, but are not part of Set{set_nr}! They will be dropped ...",
                col="y",
            )
        tr_table = tr_table[tr_table.ppid.isin(prolific_set_ppids)]  # drop (out-of-set_nr) ppids
        tr_table = tr_table.reset_index(drop=True)

    if clean_trials:
        tr_table = remove_invalid_trials(trial_results_table=tr_table, verbose=verbose)

    # Set column types
    if tr_table.caught.isna().any() or (tr_table.caught == "").any():
        return tr_table
    # Convert 'caught' column only to boolean, when there are no NaNs, i.e., missing trials
    # This is due to pd.Series([True, False, np.nan]).astype(bool) -> pd.Series([True, False, True]), ...
    # ..and to this: pd.Series([True, False, ""]).astype(bool) -> pd.Series([True, False, False])
    return tr_table.astype({"caught": bool})

remove_invalid_trials 🗿

remove_invalid_trials(
    trial_results_table: DataFrame, verbose: bool = True
) -> DataFrame

Remove invalid trials from a given trial results table.

Source code in code/facesim3d/read_data.py
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
def remove_invalid_trials(trial_results_table: pd.DataFrame, verbose: bool = True) -> pd.DataFrame:
    """Remove invalid trials from a given trial results table."""
    original_len = len(trial_results_table)

    # Remove training trials
    trial_results_table = trial_results_table[trial_results_table.block_num > 1]

    # Remove trials without data
    trial_results_table = trial_results_table[~trial_results_table.caught.isna()]
    trial_results_table = trial_results_table[trial_results_table.caught != ""]
    trial_results_table = trial_results_table.astype({"caught": bool})

    # Remove trials of blocks with catches
    for ppid_session_dataname, tr_table in tqdm(
        trial_results_table.groupby("ppid_session_dataname"),
        desc="Clean trial results table",
        total=len(trial_results_table.ppid_session_dataname.unique()),
    ):
        _ppid = tr_table.ppid.unique().item()  # == ppid_session_dataname.split("_")[0]

        # Remove all trials of participants with 3+ catches
        n_catches = tr_table.caught.sum()
        catch_threshold = 3
        if n_catches >= catch_threshold:
            if verbose:
                cprint(f"Participant {_ppid} has {n_catches} missed catch trials. Removing all trials ...", col="y")
            trial_results_table = trial_results_table[
                trial_results_table.ppid_session_dataname != ppid_session_dataname
            ]
            continue  # all blocks were removed, hence we can jump to the next participant session

        for b_idx, tr_block in tr_table.groupby("block_num"):
            if tr_block.caught.sum() > 0:
                if verbose:
                    cprint(
                        f"Participant {_ppid} has missed catch trials in block {int(b_idx)}. Removing block ...",
                        col="y",
                    )
                trial_results_table = trial_results_table[
                    ~(
                        (trial_results_table.ppid_session_dataname == ppid_session_dataname)
                        & (trial_results_table.block_num == b_idx)
                    )
                ]

    # Remove catch trials
    catch_head_trial = 0.0
    trial_results_table = trial_results_table[trial_results_table.catch_head == catch_head_trial]

    # Remove time-outs
    n_remove = (trial_results_table.head_odd == 0).sum()
    if verbose and n_remove > 0:
        cprint(string=f"{n_remove} time-out trials will be removed ...", col="y")
    trial_results_table = trial_results_table[trial_results_table.head_odd != 0]

    # TODO: Remove trials of unrealistic response times (< X sec):  # noqa: FIX002
    #  check: determine_threshold_for_minimal_response_time()  # noqa: ERA001
    #  trial_results_table[trial_results_table.response_time < params.MIN_RT_2D]  # params.MIN_RT_3D  # noqa: ERA001
    #  n_rt_outliers = (trial_results_table.response_time < params.MIN_RT_2D).sum()  # noqa: ERA001
    pass

    # TODO: Remove trials with monotonous choice behavior (> Y-times same side).  # noqa: FIX002
    #  This could also entail repeating patterns (e.g., left-right-left-right...)
    pass

    # TODO: Define other criteria (e.g., BQS, etc.)  # noqa: FIX002
    pass

    if verbose:
        n_removed = original_len - len(trial_results_table)
        cprint(
            f"\n{n_removed} of original {original_len} ({n_removed / original_len:.1%}) trials were removed ...",
            col="y",
            fm="bo",
        )

    return trial_results_table.reset_index(drop=True)

save_merged_tables_of_set 🗿

save_merged_tables_of_set(set_nr: str) -> None

Merge all tables of a given type ("TrialResults", "SessionLog") in a given Set.

Source code in code/facesim3d/read_data.py
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
def save_merged_tables_of_set(set_nr: str) -> None:
    """Merge all tables of a given type ("`TrialResults`", "`SessionLog`") in a given Set."""
    where_tab = where_to_find_trial_and_log_data(set_nr=set_nr, update_table=False)

    for table_type in ["TrialResults", "SessionLog"]:
        if table_type == "TrialResults":
            table_merged = read_trial_results_of_set(set_nr=set_nr, clean_trials=False, verbose=True)
        else:
            table_merged = read_logs_of_set(set_nr=set_nr)

        table_name = where_tab[where_tab.type == table_type].table_name.values[0]  # noqa: PD011
        prefix_date, suffix = table_name.split("_")
        if table_type == "TrialResults":
            table_merged = table_merged.drop_duplicates().reset_index(drop=True)
        else:
            table_merged = table_merged.drop_duplicates(subset=["ppid"]).reset_index(drop=True)
        prefix_date_m = prefix_date[:10] + "m"
        table_merged.to_csv(Path(paths.data.MAIN, f"{prefix_date_m}_{suffix}"), index=False)

        # Move other tables to "archive" folder
        for table_name in where_tab[where_tab.type == table_type].table_name:
            Path(paths.data.MAIN, table_name).rename(Path(paths.data.MAIN, "archive", table_name))

    # Remove table location file in where_to_find_trial_and_log_data()
    Path(paths.data.MAIN, "Where_are_TrialResults_and_Logs.csv").unlink()

    cprint(
        string="Tables are merged and saved, former tables are moved to 'archive' folder.\n"
        f"Consider renaming current tables with prefix '{prefix_date_m}_UXFData.FaceSim*.csv'.\n"
        "Then rerun where_to_find_trial_and_log_data()!",
        col="y",
    )

set_infix 🗿

set_infix(set_nr: str) -> str

Generate the Set infix (e.g., 's004' OR 's011') from a set number.

Source code in code/facesim3d/read_data.py
390
391
392
def set_infix(set_nr: str) -> str:
    """Generate the Set infix (e.g., 's004' OR 's011') from a set number."""
    return f"s{int(set_nr.split('.')[-1]):03d}"

update_triplet_table_on_dynamodb 🗿

update_triplet_table_on_dynamodb(
    session: str,
    set_finalised_triplets_to_g: bool = False,
    delete_done_triplets: bool = False,
) -> None

Update the triplet table on DynamoDB.

Parameters:

Name Type Description Default
session str

'2D' OR '3D'

required
set_finalised_triplets_to_g bool

Set finalized triplets to 'G' (if not already done)

False
delete_done_triplets bool

Whether to delete triplets that are done

False

Returns:

Type Description
None

None

Source code in code/facesim3d/read_data.py
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
def update_triplet_table_on_dynamodb(
    session: str, set_finalised_triplets_to_g: bool = False, delete_done_triplets: bool = False
) -> None:
    """
    Update the triplet table on `DynamoDB`.

    :param session: '2D' OR '3D'
    :param set_finalised_triplets_to_g: Set finalized triplets to 'G' (if not already done)
    :param delete_done_triplets: Whether to delete triplets that are done
    :return: None
    """
    if not ask_true_false(f"\nWas the latest trial data downloaded for session '{session}'?"):
        cprint(string=f"Download the latest trial data of the '{session}' session first.", col="r")
        return

    open_triplets = finalized_triplets(session=session)

    table_name = "UXFData.FaceSim.TripletsIDB." + session.upper()

    # Connect to DynamoDB
    dynamodb = boto3.resource("dynamodb", region_name="eu-central-1")  # connect to DynamoDB
    db_table = dynamodb.Table(table_name)

    # Get key names
    key_schema = db_table.key_schema
    key_names = [k["AttributeName"] for k in key_schema]

    # Load current state of triplet table
    cprint(string=f"Loading current state of {session} triplet table ...", col="b")
    response = db_table.scan()
    data = response["Items"]
    while "LastEvaluatedKey" in response:
        response = db_table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        data.extend(response["Items"])

    df_current_state = pd.DataFrame(data)  # transform to pd.DataFrame

    # Unlock triplets
    locked: str = "L"  # locked symbol
    if (df_current_state.status == locked).any():
        cprint(
            string=f"\nUnlocking {(df_current_state.status == locked).sum()} previously locked triplets ...", col="b"
        )
        for row in tqdm(data, desc=f"Unlocking items in {table_name}", colour="#02B580"):
            if row["status"] != locked:
                continue
            _update_status_item(
                dynamodb_table=db_table, table_name=table_name, data_row=row, key_names=key_names, new_status="U"
            )  # set "L" to "U"
        cprint(string=f"All previously locked triplets are now unlocked in {table_name}.", col="g")

    # Set open triplets to "U"
    cprint(string=f"\nResetting {len(open_triplets)} open triplets to 'U' ...", col="b")
    for row in tqdm(data, desc=f"Reset open triplets items in {table_name}", colour="#E86A03"):
        if row["triplet_id"] not in open_triplets or row["status"] == "U":
            continue
        _update_status_item(
            dynamodb_table=db_table, table_name=table_name, data_row=row, key_names=key_names, new_status="U"
        )  # reset open triplets
    cprint(string=f"All open triplets are now ready to be sampled in {table_name}.", col="g")

    # Set finalized triplets to "G" (if not already done)
    if set_finalised_triplets_to_g:
        cprint(string="\nSetting finalised triplets to 'G' ...", col="b")
        for row in tqdm(data, desc=f"Reset open triplets items in {table_name}", colour="#8F29E8"):
            if row["triplet_id"] not in open_triplets and row["status"] != "G":
                _update_status_item(
                    dynamodb_table=db_table, table_name=table_name, data_row=row, key_names=key_names, new_status="G"
                )
        cprint(string=f"All finalised triplets are now set to 'G' in {table_name}.", col="g")

    # Delete triplets that are done
    if delete_done_triplets:
        cprint(string="\nDeleting triplets that are done ...", col="b")
        # TODO: Test this (should work but will interfere with other functions as for cost-estimation)  # noqa: FIX002
        msg = "This is not tested yet. Be careful!"
        raise NotImplementedError(msg)
        with db_table.batch_writer() as batch:
            for row in tqdm(data, desc=f"Deleting items in {table_name}"):
                if row["status"] != "G":
                    continue
                batch.delete_item(Key=dict(zip(key_names, [row[key] for key in key_names], strict=True)))
        cprint(string=f"All done triplets deleted from {table_name}.", col="g")

update_triplet_table_on_dynamodb_multi_sub_sample 🗿

update_triplet_table_on_dynamodb_multi_sub_sample(
    session: str, set_finalised_triplets_to_g: bool = True
) -> None

Update triplet table on DynamoDB for the given session of the multi-sampled-sub-sample.

Note: This asserts that all data on DynamoDB is only from the given session. Do not execute this function, if also data from other sessions is on DynamoDB.

Source code in code/facesim3d/read_data.py
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
def update_triplet_table_on_dynamodb_multi_sub_sample(session: str, set_finalised_triplets_to_g: bool = True) -> None:
    """
    Update triplet table on `DynamoDB` for the given session of the `multi-sampled-sub-sample`.

    Note: This asserts that all data on `DynamoDB` is only from the given session.
    Do not execute this function, if also data from other sessions is on `DynamoDB`.
    """
    cprint(
        f"\nUpdating triplet table on DynamoDB for the {session}-session of the multi-sampled-sub-sample ...\n",
        col="y",
        fm="bo",
    )
    if not ask_true_false(
        question=f"Are you sure you want to update the triplet table for the {session}-session on "
        f"DynamoDB AND that only data of that session is currently on DynamoDB "
        f"(this is asserted)? "
    ):
        cprint(string="Aborting ...", col="r")
        return

    open_triplets = finalized_triplets_multi_sub_sample()

    table_name = "UXFData.FaceSim.TripletsIDB." + session.upper()

    # Connect to DynamoDB
    dynamodb = boto3.resource("dynamodb", region_name="eu-central-1")  # connect to DynamoDB
    db_table = dynamodb.Table(table_name)

    # Get key names
    key_schema = db_table.key_schema
    key_names = [k["AttributeName"] for k in key_schema]

    # Load current state of triplet table
    cprint(string=f"Loading current state of {session} triplet table ...", col="b")
    response = db_table.scan()
    data = response["Items"]
    while "LastEvaluatedKey" in response:
        response = db_table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        data.extend(response["Items"])

    df_current_state = pd.DataFrame(data)  # transform to pd.DataFrame

    # Unlock triplets
    locked: str = "L"  # locked symbol
    if (df_current_state.status == locked).any():
        cprint(
            string=f"\nUnlocking {(df_current_state.status == locked).sum()} previously locked triplets ...", col="b"
        )
        for row in tqdm(data, desc=f"Unlocking items in {table_name}"):
            if row["status"] != locked:
                continue
            _update_status_item(
                dynamodb_table=db_table, table_name=table_name, data_row=row, key_names=key_names, new_status="U"
            )  # set "L" to "U"
        cprint(string=f"All previously locked triplets are now unlocked in {table_name}.", col="g")

    # Set open triplets to "U"
    cprint(string=f"\nResetting {len(open_triplets)} open triplets to 'U' ...", col="b")
    for row in tqdm(data, desc=f"Reset open triplets items in {table_name}"):
        if row["triplet_id"] not in open_triplets or row["status"] == "U":
            continue
        _update_status_item(
            dynamodb_table=db_table, table_name=table_name, data_row=row, key_names=key_names, new_status="U"
        )  # reset open triplets
    cprint(string=f"All open triplets are now ready to be sampled in {table_name}.", col="g")

    # Set finalized triplets to "G" (if not already done)
    found_finalised_triplets = False
    if set_finalised_triplets_to_g:
        cprint(string="\nSetting finalised triplets to 'G' ...", col="b")
        for row in tqdm(data, desc=f"Reset open triplets items in {table_name}"):
            if row["triplet_id"] not in open_triplets and row["status"] != "G":
                _update_status_item(
                    dynamodb_table=db_table, table_name=table_name, data_row=row, key_names=key_names, new_status="G"
                )
                found_finalised_triplets = True
        if found_finalised_triplets:
            cprint(string=f"All finalised triplets are now set to 'G' in {table_name}.", col="g")

    if len(open_triplets) == 0:
        cprint(string=f"\nAll triplets are finalised in {table_name}.", col="g")
    else:
        cprint(
            string=f"\nInvite max {np.maximum(np.floor(len(open_triplets) / 171).astype(int), 1)} "
            f"participants at once!\n",
            col="y",
            fm="bo",
        )

where_to_find_trial_and_log_data 🗿

where_to_find_trial_and_log_data(
    set_nr: str, update_table: bool = False
) -> DataFrame

Get information about in which files trial results and log data can be found for a given Set number.

Source code in code/facesim3d/read_data.py
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
def where_to_find_trial_and_log_data(set_nr: str, update_table: bool = False) -> pd.DataFrame:
    """Get information about in which files trial results and log data can be found for a given Set number."""
    # Path to look-up table
    table_name: str = "Where_are_TrialResults_and_Logs.csv"
    p2_where_table = list(Path(paths.data.MAIN).glob(f"*{table_name}"))
    if len(p2_where_table) > 1:
        msg = f"More than one table found:\n{p2_where_table}"
        raise AssertionError(msg)

    if (len(p2_where_table) == 0) or update_table:
        if len(p2_where_table) == 0:
            cprint(string="Generating where-to-find table ...", col="b")

        # Init table
        where_table = pd.DataFrame(columns=["set_nr", "table_name", "type"])

        # Find all trial results and log files
        list_of_trial_result_tables = list(Path(paths.data.MAIN).glob("*UXFData.FaceSim.TrialResults.csv"))
        list_of_log_tables = list(Path(paths.data.MAIN).glob("*UXFData.FaceSim.SessionLog.csv"))
        list_of_tables = list_of_trial_result_tables + list_of_log_tables

        # Iterate through different sets
        set_files = sorted(Path(paths.data.main.prolific).glob("*Participants-Set*"))
        for p2_ppid_set in tqdm(
            set_files, desc="Find tables for each Set", total=len(set_files), position=0, colour="#51F1EE"
        ):
            # Get set number
            current_set_nr = p2_ppid_set.name.split("-Set")[-1].split("_")[0]

            # Get participants of current Set
            ppid_set = read_prolific_participant_data(set_nr=current_set_nr)["Participant id"].to_list()

            # Populate table
            for p2_table in tqdm(
                list_of_tables,
                desc=f"Iterate through all tables for Set{current_set_nr}",
                total=len(list_of_tables),
                position=1,
                leave=False,
                colour="#51A4F1",
            ):
                if "TrialResults" in p2_table.name:
                    tr_table = _read_trial_results(process=False, date=p2_table.name.split("_")[0])

                    # Extract those with matching set_nr
                    ppid_set_tr = [f"{p}_{set_infix(current_set_nr)}_trial_results" for p in ppid_set]

                    if tr_table.ppid_session_dataname.isin(ppid_set_tr).any():
                        where_table = where_table.append(
                            {"set_nr": current_set_nr, "table_name": p2_table.name, "type": "TrialResults"},
                            ignore_index=True,
                        )
                else:  # "SessionLog" in p2_table.name
                    log_table = load_local_table(table_name=p2_table.name)
                    ppid_set_log = [f"{p}_{set_infix(current_set_nr)}_log" for p in ppid_set]
                    if log_table.ppid_session_dataname.isin(ppid_set_log).any():
                        where_table = where_table.append(
                            {"set_nr": current_set_nr, "table_name": p2_table.name, "type": "SessionLog"},
                            ignore_index=True,
                        )

        # Sort table by set number
        where_table = where_table.sort_values(by=["set_nr", "type", "table_name"], axis=0, ascending=True).reset_index(
            drop=True
        )

        # Save (or overwrite) table
        where_table.to_csv(Path(paths.data.MAIN) / table_name, index=False)

        # Return table
        return where_table[where_table.set_nr == set_nr]

    # else use:  # len(p2_where_table) == 1:
    p2_where_table = p2_where_table.pop()
    where_table = pd.read_csv(p2_where_table, dtype=object)

    # Check if set_nr is in table (if not update table)
    if len(where_table[where_table.set_nr == set_nr]) == 0:
        cprint(string=f"Set{set_nr} is not in table {p2_where_table.name}. Updating table ...", col="y")
        where_table = where_to_find_trial_and_log_data(set_nr=set_nr, update_table=True)

    # Return table
    return where_table[where_table.set_nr == set_nr]