Skip to content

Database Pipeline

flask_app.db_create_pipeline.add_fields_from_data(attr_names, values, object)

Add dynamic fields to an object from the database.

Parameters:

Name Type Description Default
attr_names List

A list of new fields that we want to add to the object.

required
values List

A list of values to be inserted in the database for the corresponding field.

required
object Database object

The database object to which we want to add dynamic fields.

required

Returns:

Type Description

Database object: The updated database object with the new fields and values.

Source code in flask_app/db_create_pipeline.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def add_fields_from_data(attr_names, values, object):
    """Add dynamic fields to an object from the database.

    Args:
        attr_names (List): A list of new fields that we want to add to the object.
        values (List): A list of values to be inserted in the database for the corresponding field.
        object (Database object): The database object to which we want to add dynamic fields.

    Returns:
        Database object: The updated database object with the new fields and values.
    """
    for index, attr_name in enumerate(attr_names):
        object.__setattr__(attr_name, values[index])

    return object

flask_app.db_create_pipeline.get_doc_object_in_db(attr_names, values, display_fields)

Retrieves a document object from the database based on the provided attribute names, values, and display fields.

Parameters:

Name Type Description Default
attr_names list

A list of attribute names.

required
values list

A list of corresponding attribute values.

required
display_fields list

A list of fields to be displayed.

required

Returns:

Name Type Description
doc_obj

The document object retrieved from the database.

Source code in flask_app/db_create_pipeline.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def get_doc_object_in_db(attr_names, values, display_fields):
    """
    Retrieves a document object from the database based on the provided attribute names, values, and display fields.

    Args:
        attr_names (list): A list of attribute names.
        values (list): A list of corresponding attribute values.
        display_fields (list): A list of fields to be displayed.

    Returns:
        doc_obj: The document object retrieved from the database.
    """
    filter_ = {attr: value for attr, value in zip(attr_names, values) if attr in display_fields}
    doc_obj = database.DocRepr.objects.filter(**filter_).first()
    return doc_obj

flask_app.db_create_pipeline.add_exp_to_db(data_exp)

Adding experiment to the database based on what is defined in the experiment files Args: data_exp (list): A list of dictionaries containing information about the experiments.

Returns:

Type Description

None

Source code in flask_app/db_create_pipeline.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def add_exp_to_db(data_exp):
    """
    Adding experiment to the database based on what is defined in the experiment files
    Args:
        data_exp (list): A list of dictionaries containing information about the experiments.

    Returns:
        None
    """
    for exp_info in data_exp:
        exp_obj = database.Experiment.objects(_exp_id=str(exp_info['exp_id'])).first()

        tasks_obj = []
        for task in exp_info['tasks']:
            query_obj = database.QueryRepr.objects(title=task['query_title']).first()
            if query_obj is not None:
                data_obj = database.Data.objects(query=str(query_obj._id)).first()
                if data_obj is not None:
                    if 'index' in task.keys():
                        task_obj = database.TaskScore.objects(data=str(data_obj._id), ranking_type=task['ranking_type'],
                                                              index=str(task['index'])).first()
                        if not task_obj:
                            task_obj = database.TaskScore()
                            task_obj = add_fields_from_data(list(task.keys()), list(task.values()), task_obj)
                            task_obj.ranking_type = task['ranking_type']
                    elif 'ranking_type' in task.keys():
                        task_obj = database.Task.objects(data=str(data_obj._id),
                                                         ranking_type=task['ranking_type']).first()
                        if not task_obj:
                            task_obj = database.Task()
                            task_obj = add_fields_from_data(list(task.keys()), list(task.values()), task_obj)
                            task_obj.ranking_type = task['ranking_type']
                    elif 'ranking_type_2' in task.keys():
                        task_obj = database.TaskCompare.objects(data=str(data_obj._id),
                                                                ranking_type_1=task['ranking_type_1'],
                                                                ranking_type_2=task['ranking_type_2']).first()
                        if not task_obj:
                            task_obj = database.TaskCompare()
                            task_obj = add_fields_from_data(list(task.keys()), list(task.values()), task_obj)
                            task_obj.ranking_type_1 = task['ranking_type_1']
                            task_obj.ranking_type_2 = task['ranking_type_2']

                    task_obj.data = str(data_obj._id)
                    task_obj.save()

                    if not exp_obj or (
                            not str(task_obj.auto_id_0) in exp_obj.tasks or not str(task_obj._id) in exp_obj.tasks):
                        tasks_obj.append(str(task_obj.auto_id_0))

        if exp_obj:
            if len(tasks_obj) > 0:
                exp_obj.tasks.extend(tasks_obj)
        else:
            if not exp_obj and tasks_obj is not None:
                exp_obj = database.Experiment(_exp_id=str(exp_info['exp_id']), _description=exp_info['description'],
                                              tasks=tasks_obj)
        exp_obj.save()

flask_app.db_create_pipeline.add_query_docs_to_db(data, data_configs)

Adding query and documents to the database. Args: data (dict): dict containing the query dataframe and the document dataframe. data_configs (dict): configuration dict of the dataset.

Source code in flask_app/db_create_pipeline.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def add_query_docs_to_db(data, data_configs):
    """Adding query and documents to the database.
    Args:
        data (dict): dict containing the query dataframe and the document dataframe.
        data_configs (dict): configuration dict of the dataset.
    """
    fields = list(data_configs.values())
    for query, group in data['docs'].groupby(data_configs['query']):
        query_obj = database.QueryRepr.objects(title=query).first()

        if not query_obj:
            query_text = data['query'][data['query']['title'] == query]['text'].values[0]
            query_obj = database.QueryRepr(title=query, text=query_text)
            query_obj.save()

        for _, row in group.iterrows():
            doc_obj = get_doc_object_in_db(data['docs'].columns, row.values, fields)
            if not doc_obj:
                doc_obj = database.DocRepr()
                doc_obj = add_fields_from_data(data['docs'].columns, row.values, doc_obj)
                doc_obj.save()

flask_app.db_create_pipeline.add_data_to_db(data, fields, ranking_type, query_col, sort_col='score', ascending=False)

Adding query-ranking pairs in the database (Data object). If pre-processing fairness methods are applied the changed data will be added in the database (DocRepr object).

Parameters:

Name Type Description Default
data dict

dict containing the query dataframe and the document dataframe.

required
fields list(str

list of fields defined for the document.

required
ranking_type str

ranking type of the ranking to be added in the database (e.g. original if ranking is done based on the original value of sort_col, else depends on the ranker model or fairness intervention applied on the data).

required
query_col str

column name defined as the query.

required
sort_col str

column name to sort the documents in the ranking.

'score'
ascending bool

True if sorting by sort_col in ascending order, else in descending order.

False
Source code in flask_app/db_create_pipeline.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def add_data_to_db(data, fields, ranking_type, query_col, sort_col='score', ascending=False):
    """Adding query-ranking pairs in the database (Data object).
        If pre-processing fairness methods are applied the changed data will be added in the database (DocRepr object).

    Args:
        data (dict): dict containing the query dataframe and the document dataframe.
        fields (list(str)): list of fields defined for the document.
        ranking_type (str): ranking type of the ranking to be added in the database
            (e.g. original if ranking is done based on the original value of sort_col,
            else depends on the ranker model or fairness intervention applied on the data).
        query_col (str): column name defined as the query.
        sort_col (str): column name to sort the documents in the ranking.
        ascending (bool): True if sorting by sort_col in ascending order, else in descending order.
    """
    data['docs'] = data['docs'].groupby(query_col).apply(
        lambda x: x.sort_values(sort_col, ascending=ascending)).reset_index(drop=True)
    for query, group in data['docs'].groupby(query_col):
        doc_list = []

        query_obj = database.QueryRepr.objects(title=query).first()
        if query_obj:
            for _, row in group.iterrows():
                doc_obj = get_doc_object_in_db(data['docs'].columns, row.values, fields)
                if 'original' not in ranking_type:
                    if ranking_type not in doc_obj:
                        add_fields_from_data([ranking_type], [[]], doc_obj)
                        already_added = False
                    else:
                        already_added = len(
                            [predoc for predoc in doc_obj[ranking_type] if predoc.ranking_type == ranking_type]) > 0

                    if not already_added:
                        pre_doc = database.PreDocRepr(ranking_type=ranking_type)
                        if ranking_type.startswith('preprocessing'):
                            columns_fair = [col for col in data['docs'].columns if '_fair' in col]
                            values_fair = [row[col] for col in columns_fair]
                            add_fields_from_data(columns_fair, values_fair, pre_doc)
                        else:
                            columns = ["prediction"]
                            max_ = max(group[data['docs'].columns[-1]]) + 1
                            values = [max_ - row[data['docs'].columns[-1]]]
                            add_fields_from_data(columns, values, pre_doc)
                        doc_obj[ranking_type].append(pre_doc)
                        doc_obj.save()

                doc_list.append(doc_obj._id)

            ranking_obj = database.Ranking(ranking_type=ranking_type, docs=doc_list)
            data_obj = database.Data.objects(query=str(query_obj._id)).first()
            if not data_obj:
                data_obj = database.Data(query=str(query_obj._id), rankings=[ranking_obj])
                data_obj.save()
            else:
                saved_rankings = [r.ranking_type for r in data_obj.rankings]
                if ranking_obj.ranking_type not in saved_rankings:
                    data_obj.rankings.append(ranking_obj)
                    data_obj.save()
        else:
            continue

flask_app.db_create_pipeline.get_docs_df(ranking_type, data_config, features)

Retrieve documents representation from the database and converts in dataframe. Args: ranking_type (str): If set to preprocessing, it retrieves the document representation transformed by the preprocessing fairness method. If set to original it retrieves the original document representation. data_config (dict): Configuration dict of the dataset. features (list(str)): List of columns representing the features of the document.

Returns:

Name Type Description
df DataFrame

dataframe containing the retrieved document representation from the database.

Source code in flask_app/db_create_pipeline.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def get_docs_df(ranking_type, data_config, features):
    """Retrieve documents representation from the database and converts in dataframe.
    Args:
        ranking_type (str): If set to preprocessing, it retrieves the document representation
            transformed by the preprocessing fairness method.
            If set to original it retrieves the original document representation.
        data_config (dict): Configuration dict of the dataset.
        features (list(str)): List of columns representing the features of the document.

    Returns:
        df (pandas.DataFrame): dataframe containing the retrieved document representation from the database.
    """

    data_list = []
    for doc in database.DocRepr.objects():
        if 'original' not in ranking_type:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]

            if ranking_type.startswith('preprocessing'):
                sub_columns = [col + '_fair' for col in features]
                sub_columns.append(data_config['score'] + '_fair')
            else:
                sub_columns = ["prediction"]
            field_list.extend(sub_columns)

            data_list.append({field: getattr(doc, field, None) if field not in sub_columns else getattr(
                doc[ranking_type][0], field, None) for field in field_list})
        else:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]
            field_list.extend(features)
            field_list.append(data_config['score'])
            data_list.append({field: getattr(doc, field, None) for field in field_list})


    # Create a Pandas DataFrame from the list of dictionaries
    df = pd.DataFrame(data_list)
    return df

flask_app.db_create_pipeline.get_docs_df(ranking_type, data_config, features)

Retrieve documents representation from the database and converts in dataframe. Args: ranking_type (str): If set to preprocessing, it retrieves the document representation transformed by the preprocessing fairness method. If set to original it retrieves the original document representation. data_config (dict): Configuration dict of the dataset. features (list(str)): List of columns representing the features of the document.

Returns:

Name Type Description
df DataFrame

dataframe containing the retrieved document representation from the database.

Source code in flask_app/db_create_pipeline.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def get_docs_df(ranking_type, data_config, features):
    """Retrieve documents representation from the database and converts in dataframe.
    Args:
        ranking_type (str): If set to preprocessing, it retrieves the document representation
            transformed by the preprocessing fairness method.
            If set to original it retrieves the original document representation.
        data_config (dict): Configuration dict of the dataset.
        features (list(str)): List of columns representing the features of the document.

    Returns:
        df (pandas.DataFrame): dataframe containing the retrieved document representation from the database.
    """

    data_list = []
    for doc in database.DocRepr.objects():
        if 'original' not in ranking_type:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]

            if ranking_type.startswith('preprocessing'):
                sub_columns = [col + '_fair' for col in features]
                sub_columns.append(data_config['score'] + '_fair')
            else:
                sub_columns = ["prediction"]
            field_list.extend(sub_columns)

            data_list.append({field: getattr(doc, field, None) if field not in sub_columns else getattr(
                doc[ranking_type][0], field, None) for field in field_list})
        else:
            field_list = [data_config['query'], data_config['docID'], data_config['group']]
            field_list.extend(features)
            field_list.append(data_config['score'])
            data_list.append({field: getattr(doc, field, None) for field in field_list})


    # Create a Pandas DataFrame from the list of dictionaries
    df = pd.DataFrame(data_list)
    return df

Pipeline class for inserting the data in the database.

__init__(config)

Pipeline init class. Attributes


config : dict configuration dict defined in the configuration file data_reader : DataReader DataReader object corresponding to the dataset defined in the configuration file query_col : str

read_data()

Read data using the DataReader

Returns:

Name Type Description
data_train DataFrame

data used for training the ranker and/or the fairness intervention.

data_test DataFrame

data used for testing and displaying in the UI.

train_ranker()

Trains the ranking model based on the configurations defined under train_ranker_config. It saves the predicted ranking on the test split in the database.

Returns: None

apply_fair_method(fields, config_method_key, sort_column, ascending)

Apply fairness methods defined in the configuration file under pre/in/post_processing_config. Save the changed data (in case of pre-processing) and the new ranking in the database. Args: fields (list(str)): attributes of the document defined in the configuration file. config_method_key (str): can have the following values: pre_processing, in_processing and post_processing indicating which type of fairness method is applied. sort_column (str): column name after which the items are ranked. ascending (bool): True if sorting by sort_col in ascending order, else in descending order.

run()