Skip to content

Database Pipeline

flask_app.db_create_pipeline.add_fields_from_data(attr_names, values, object)

Add dynamic fields to an object from the database.

Parameters:

Name Type Description Default
attr_names List

A list of new fields that we want to add to the object.

required
values List

A list of values to be inserted in the database for the corresponding field.

required
object Database object

The database object to which we want to add dynamic fields.

required

Returns:

Type Description

Database object: The updated database object with the new fields and values.

Source code in flask_app/db_create_pipeline.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def add_fields_from_data(attr_names, values, object):
    """Add dynamic fields to an object from the database.

    Args:
        attr_names (List): A list of new fields that we want to add to the object.
        values (List): A list of values to be inserted in the database for the corresponding field.
        object (Database object): The database object to which we want to add dynamic fields.

    Returns:
        Database object: The updated database object with the new fields and values.
    """
    for index, attr_name in enumerate(attr_names):
        object.__setattr__(attr_name, values[index])

    return object

flask_app.db_create_pipeline.get_doc_object_in_db(attr_names, values, display_fields)

Retrieves a document object from the database based on the provided attribute names, values, and display fields.

Parameters:

Name Type Description Default
attr_names list

A list of attribute names.

required
values list

A list of corresponding attribute values.

required
display_fields list

A list of fields to be displayed.

required

Returns:

Name Type Description
doc_obj

The document object retrieved from the database.

Source code in flask_app/db_create_pipeline.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def get_doc_object_in_db(attr_names, values, display_fields):
    """
    Retrieves a document object from the database based on the provided attribute names, values, and display fields.

    Args:
        attr_names (list): A list of attribute names.
        values (list): A list of corresponding attribute values.
        display_fields (list): A list of fields to be displayed.

    Returns:
        doc_obj: The document object retrieved from the database.
    """
    filter_ = {attr: value for attr, value in zip(attr_names, values) if attr in display_fields}
    doc_obj = database.DocRepr.objects.filter(**filter_).first()
    return doc_obj

flask_app.db_create_pipeline.add_exp_to_db(data_exp)

Adds experiment data to the database based on the provided experiment configurations. Ensures that 'form' tasks (questionnaires) are uniquely identified per experiment.

Parameters:

Name Type Description Default
data_exp list

A list of dictionaries, where each dictionary contains information about an experiment, including its tasks.

required

Returns: None

Source code in flask_app/db_create_pipeline.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def add_exp_to_db(data_exp):
    """
    Adds experiment data to the database based on the provided experiment configurations.
    Ensures that 'form' tasks (questionnaires) are uniquely identified per experiment.

    Args:
        data_exp (list): A list of dictionaries, where each dictionary contains
                         information about an experiment, including its tasks.
    Returns:
        None
    """

    for exp_info in data_exp:
        exp_id_str = str(exp_info['exp_id']) # Get experiment ID as string for unique naming

        # Try to find the experiment in the database
        exp_obj = database.Experiment.objects(_exp_id=exp_id_str).first()

        tasks_obj_ids_for_exp = [] # This list will hold the IDs of tasks in the correct order for this experiment

        for task_data_from_json in exp_info["tasks"]:
            current_task_obj = None
            query_obj = database.QueryRepr.objects(title=task_data_from_json.get("query_title")).first()
            data_obj = None
            if query_obj:
                data_obj = database.Data.objects(query=str(query_obj._id)).first()

            # --- 1️⃣ Handle FORM tasks first ---
            if task_data_from_json.get("ranking_type") == "form":
                original_query_title = task_data_from_json.get("query_title", "unknown_form_title")
                unique_form_query_title = f"exp{exp_id_str}_{original_query_title}"

                filter_kwargs = {
                    "query_title": unique_form_query_title,
                    "ranking_type": "form",
                    "questionnaire": task_data_from_json.get("questionnaire"),
                    "show_xai": task_data_from_json.get("show_xai"),
                }
                if "cand_idx" in task_data_from_json:
                    filter_kwargs["cand_idx"] = task_data_from_json["cand_idx"]

                current_task_obj = database.Task.objects(**filter_kwargs).first()

                if not current_task_obj:
                    current_task_obj = database.Task()
                    task_data_to_save = task_data_from_json.copy()
                    task_data_to_save["query_title"] = unique_form_query_title
                    current_task_obj = add_fields_from_data(
                        list(task_data_to_save.keys()), list(task_data_to_save.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = "form"
                    current_task_obj.data = "form"  # or adjust type if data is a ReferenceField
                    current_task_obj.save()
                    logger.debug(
                        f"CREATED Form Task: {unique_form_query_title} (Original: {original_query_title}) "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )
                else:
                    task_data_to_save = task_data_from_json.copy()
                    task_data_to_save["query_title"] = unique_form_query_title
                    current_task_obj = add_fields_from_data(
                        list(task_data_to_save.keys()), list(task_data_to_save.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = "form"
                    current_task_obj.data = "form"
                    current_task_obj.save()
                    logger.debug(
                        f"FOUND/UPDATED Form Task: {unique_form_query_title} (Original: {original_query_title}) "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )

            # --- 2️⃣ Tasks that reference show_xai ---
            elif "show_xai" in task_data_from_json:

                filter_kwargs = {
                    "data": str(data_obj._id) if data_obj else None,
                    "ranking_type": task_data_from_json.get("ranking_type"),
                    "questionnaire": task_data_from_json.get("questionnaire"),
                    "show_xai": task_data_from_json.get("show_xai"),
                }
                if "cand_idx" in task_data_from_json:
                    filter_kwargs["cand_idx"] = task_data_from_json["cand_idx"]

                current_task_obj = database.Task.objects(**filter_kwargs).first()
                if not current_task_obj:
                    current_task_obj = database.Task()
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = task_data_from_json["ranking_type"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"CREATED Task (show_xai): {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )
                else:
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = task_data_from_json["ranking_type"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"FOUND/UPDATED Task (show_xai): {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )

            # --- 3️⃣ TaskScore branch (index) ---
            elif "index" in task_data_from_json:
                current_task_obj = database.TaskScore.objects(
                    data=str(data_obj._id) if data_obj else None,
                    ranking_type=task_data_from_json["ranking_type"],
                    index=str(task_data_from_json["index"]),
                ).first()
                if not current_task_obj:
                    current_task_obj = database.TaskScore()
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = task_data_from_json["ranking_type"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"CREATED TaskScore: {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )
                else:
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = task_data_from_json["ranking_type"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"FOUND/UPDATED TaskScore: {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )

            # --- 4️⃣ TaskCompare branch ---
            elif "ranking_type_2" in task_data_from_json:
                current_task_obj = database.TaskCompare.objects(
                    data=str(data_obj._id) if data_obj else None,
                    ranking_type_1=task_data_from_json["ranking_type_1"],
                    ranking_type_2=task_data_from_json["ranking_type_2"],
                ).first()
                if not current_task_obj:
                    current_task_obj = database.TaskCompare()
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type_1 = task_data_from_json["ranking_type_1"]
                    current_task_obj.ranking_type_2 = task_data_from_json["ranking_type_2"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"CREATED TaskCompare: {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )
                else:
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type_1 = task_data_from_json["ranking_type_1"]
                    current_task_obj.ranking_type_2 = task_data_from_json["ranking_type_2"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"FOUND/UPDATED TaskCompare: {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )

            # --- 5️⃣ Generic ranking tasks (fallback) ---
            elif "ranking_type" in task_data_from_json:
                current_task_obj = database.Task.objects(
                    data=str(data_obj._id) if data_obj else None,
                    ranking_type=task_data_from_json["ranking_type"],
                ).first()
                if not current_task_obj:
                    current_task_obj = database.Task()
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = task_data_from_json["ranking_type"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"CREATED Generic Ranking Task: {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )
                else:
                    current_task_obj = add_fields_from_data(
                        list(task_data_from_json.keys()), list(task_data_from_json.values()), current_task_obj
                    )
                    current_task_obj.ranking_type = task_data_from_json["ranking_type"]
                    current_task_obj.data = str(data_obj._id) if data_obj else None
                    current_task_obj.save()
                    logger.debug(
                        f"FOUND/UPDATED Generic Ranking Task: {task_data_from_json.get('query_title', 'N/A')} "
                        f"for Exp {exp_id_str} with ID {current_task_obj.id}"
                    )

            # --- 6️⃣ Unexpected / fallback ---
            else:
                logger.warning(f"Task with unexpected structure: {task_data_from_json}")
                continue

            # Append task id to experiment
            if current_task_obj:
                tasks_obj_ids_for_exp.append(str(current_task_obj.id))
            else:
                logger.error(f"Failed to process task: {task_data_from_json}")

        # --- Final Experiment Save Logic ---
        # After processing all tasks for the current experiment:
        if exp_obj: # If the experiment already exists in the database
            # Replace the entire tasks list with the newly collected and ordered IDs
            exp_obj.tasks = tasks_obj_ids_for_exp
            # AGGIUNGI O AGGIORNA ALTRI CAMPI DELL'ESPERIMENTO SE NECESSARIO QUI
            exp_obj._description = exp_info.get('description', f"Experiment {exp_id_str}") # Aggiorna la descrizione anche per gli esperimenti esistenti
            logger.info(f"Existing experiment {exp_id_str} updated with new task order.")
        else: # If the experiment does not exist, create it
            logger.info(f"Creating new experiment {exp_id_str} with defined tasks.")
            exp_obj = database.Experiment(
                _exp_id=exp_id_str,
                # CORREZIONE: Cambia 'description' in '_description' per allinearlo al modello del database
                _description=exp_info.get('description', f"Experiment {exp_id_str}"),
                tasks=tasks_obj_ids_for_exp
            )
        exp_obj.save() # Save the updated or newly created Experiment object
        logger.info(f"Experiment {exp_id_str} saved successfully with tasks: {exp_obj.tasks}")

flask_app.db_create_pipeline.add_query_docs_to_db(data, data_configs)

Adding query and documents to the database. Args: data (dict): dict containing the query dataframe and the document dataframe. data_configs (dict): configuration dict of the dataset.

Source code in flask_app/db_create_pipeline.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def add_query_docs_to_db(data, data_configs):
    """Adding query and documents to the database.
    Args:
        data (dict): dict containing the query dataframe and the document dataframe.
        data_configs (dict): configuration dict of the dataset.
    """
    fields = list(data_configs.values())
    for query, group in data['docs'].groupby(data_configs['query']):
        query_obj = database.QueryRepr.objects(title=query).first()

        if not query_obj:
            query_text = data['query'][data['query']['title'] == query]['text'].values[0]
            query_obj = database.QueryRepr(title=query, text=query_text)
            query_obj.save()

        for _, row in group.iterrows():
            doc_obj = get_doc_object_in_db(data['docs'].columns, row.values, fields)
            if not doc_obj:
                doc_obj = database.DocRepr()
                doc_obj = add_fields_from_data(data['docs'].columns, row.values, doc_obj)
                doc_obj.save()

flask_app.db_create_pipeline.add_data_to_db(data, fields, ranking_type, query_col, sort_col='score', ascending=True)

Adding query-ranking pairs in the database (Data object). If pre-processing fairness methods are applied the changed data will be added in the database (DocRepr object).

Parameters:

Name Type Description Default
data dict

dict containing the query dataframe and the document dataframe.

required
fields list(str

list of fields defined for the document.

required
ranking_type str

ranking type of the ranking to be added in the database (e.g. original if ranking is done based on the original value of sort_col, else depends on the ranker model or fairness intervention applied on the data).

required
query_col str

column name defined as the query.

required
sort_col str

column name to sort the documents in the ranking.

'score'
ascending bool

True if sorting by sort_col in ascending order, else in descending order.

True
Source code in flask_app/db_create_pipeline.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def add_data_to_db(data, fields, ranking_type, query_col, sort_col='score', ascending=True):
    """Adding query-ranking pairs in the database (Data object).
        If pre-processing fairness methods are applied the changed data will be added in the database (DocRepr object).

    Args:
        data (dict): dict containing the query dataframe and the document dataframe.
        fields (list(str)): list of fields defined for the document.
        ranking_type (str): ranking type of the ranking to be added in the database
            (e.g. original if ranking is done based on the original value of sort_col,
            else depends on the ranker model or fairness intervention applied on the data).
        query_col (str): column name defined as the query.
        sort_col (str): column name to sort the documents in the ranking.
        ascending (bool): True if sorting by sort_col in ascending order, else in descending order.
    """
    data['docs'] = data['docs'][data['docs'][sort_col] != ""]

    data['docs'] = data['docs'].groupby(query_col).apply(
        lambda x: x.sort_values(sort_col, ascending=ascending)).reset_index(drop=True)

    for query, group in data['docs'].groupby(query_col):
        doc_list = []

        query_obj = database.QueryRepr.objects(title=query).first()
        if query_obj:
            for _, row in group.iterrows():
                doc_obj = get_doc_object_in_db(data['docs'].columns, row.values, fields)
                if 'original' not in ranking_type:
                    if ranking_type not in doc_obj:
                        add_fields_from_data([ranking_type], [[]], doc_obj)
                        already_added = False
                    else:
                        already_added = len(
                            [predoc for predoc in doc_obj[ranking_type] if predoc.ranking_type == ranking_type]) > 0

                    if not already_added:
                        pre_doc = database.PreDocRepr(ranking_type=ranking_type)
                        if ranking_type.startswith('preprocessing'):
                            columns_fair = [col for col in data['docs'].columns if '_fair' in col]
                            values_fair = [row[col] for col in columns_fair]
                            add_fields_from_data(columns_fair, values_fair, pre_doc)
                        # else:
                        #     columns = ["prediction"]
                        #     max_ = max(group[data['docs'].columns[-1]]) + 1
                        #     values = [max_ - row[data['docs'].columns[-1]]]
                        #     add_fields_from_data(columns, values, pre_doc)
                        doc_obj[ranking_type].append(pre_doc)
                        doc_obj.save()

                doc_list.append(doc_obj._id)

            ranking_obj = database.Ranking(ranking_type=ranking_type, docs=doc_list)
            data_obj = database.Data.objects(query=str(query_obj._id)).first()
            if not data_obj:
                data_obj = database.Data(query=str(query_obj._id), rankings=[ranking_obj])
                data_obj.save()
            else:
                saved_rankings = [r.ranking_type for r in data_obj.rankings]
                if ranking_obj.ranking_type not in saved_rankings:
                    data_obj.rankings.append(ranking_obj)
                    data_obj.save()

flask_app.db_create_pipeline.get_docs_df(ranking_type, data_config, features)

Retrieve documents representation from the database and converts in dataframe. Args: ranking_type (str): If set to preprocessing, it retrieves the document representation transformed by the preprocessing fairness method. If set to original it retrieves the original document representation. data_config (dict): Configuration dict of the dataset. features (list(str)): List of columns representing the features of the document.

Returns:

Name Type Description
df DataFrame

dataframe containing the retrieved document representation from the database.

Source code in flask_app/db_create_pipeline.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def get_docs_df(ranking_type, data_config, features):
    """Retrieve documents representation from the database and converts in dataframe.
    Args:
        ranking_type (str): If set to preprocessing, it retrieves the document representation
            transformed by the preprocessing fairness method.
            If set to original it retrieves the original document representation.
        data_config (dict): Configuration dict of the dataset.
        features (list(str)): List of columns representing the features of the document.

    Returns:
        df (pandas.DataFrame): dataframe containing the retrieved document representation from the database.
    """

    data_list = []
    for doc in database.DocRepr.objects():
        if 'original' not in ranking_type:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]

            if ranking_type.startswith('preprocessing'):
                sub_columns = [col + '_fair' for col in features]
                sub_columns.append(data_config['score'] + '_fair')
            else:
                sub_columns = ["prediction"]
            field_list.extend(sub_columns)

            data_list.append({field: getattr(doc, field, None) if field not in sub_columns else getattr(
                doc[ranking_type][0], field, None) for field in field_list})
        else:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]
            field_list.extend(features)
            field_list.append(data_config['score'])
            data_list.append({field: getattr(doc, field, None) for field in field_list})


    # Create a Pandas DataFrame from the list of dictionaries
    df = pd.DataFrame(data_list)
    return df

flask_app.db_create_pipeline.get_docs_df(ranking_type, data_config, features)

Retrieve documents representation from the database and converts in dataframe. Args: ranking_type (str): If set to preprocessing, it retrieves the document representation transformed by the preprocessing fairness method. If set to original it retrieves the original document representation. data_config (dict): Configuration dict of the dataset. features (list(str)): List of columns representing the features of the document.

Returns:

Name Type Description
df DataFrame

dataframe containing the retrieved document representation from the database.

Source code in flask_app/db_create_pipeline.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def get_docs_df(ranking_type, data_config, features):
    """Retrieve documents representation from the database and converts in dataframe.
    Args:
        ranking_type (str): If set to preprocessing, it retrieves the document representation
            transformed by the preprocessing fairness method.
            If set to original it retrieves the original document representation.
        data_config (dict): Configuration dict of the dataset.
        features (list(str)): List of columns representing the features of the document.

    Returns:
        df (pandas.DataFrame): dataframe containing the retrieved document representation from the database.
    """

    data_list = []
    for doc in database.DocRepr.objects():
        if 'original' not in ranking_type:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]

            if ranking_type.startswith('preprocessing'):
                sub_columns = [col + '_fair' for col in features]
                sub_columns.append(data_config['score'] + '_fair')
            else:
                sub_columns = ["prediction"]
            field_list.extend(sub_columns)

            data_list.append({field: getattr(doc, field, None) if field not in sub_columns else getattr(
                doc[ranking_type][0], field, None) for field in field_list})
        else:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]
            field_list.extend(features)
            field_list.append(data_config['score'])
            data_list.append({field: getattr(doc, field, None) for field in field_list})


    # Create a Pandas DataFrame from the list of dictionaries
    df = pd.DataFrame(data_list)
    return df

Pipeline class for inserting the data in the database.

__init__(config)

Pipeline init class. Attributes


config : dict configuration dict defined in the configuration file data_reader : DataReader DataReader object corresponding to the dataset defined in the configuration file query_col : str

read_data()

Read data using the DataReader

Returns:

Name Type Description
data_train DataFrame

data used for training the ranker and/or the fairness intervention.

data_test DataFrame

data used for testing and displaying in the UI.

train_ranker()

Trains the ranking model based on the configurations defined under train_ranker_config. It saves the predicted ranking on the test split in the database.

Returns: None

apply_fair_method(fields, config_method_key, sort_column, ascending)

Apply fairness methods defined in the configuration file under pre/in/post_processing_config. Save the changed data (in case of pre-processing) and the new ranking in the database. Args: fields (list(str)): attributes of the document defined in the configuration file. config_method_key (str): can have the following values: pre_processing, in_processing and post_processing indicating which type of fairness method is applied. sort_column (str): column name after which the items are ranked. ascending (bool): True if sorting by sort_col in ascending order, else in descending order.

run()