Spaces:
Build error
Build error
| from transformers import AutoTokenizer, AutoModelForTableQuestionAnswering | |
| import pandas as pd | |
| import re | |
| p = re.compile('\d+(\.\d+)?') | |
| def load_model_and_tokenizer(): | |
| """ | |
| Load | |
| """ | |
| # Load pretrained tokenizer: TAPAS finetuned on WikiTable Questions | |
| # tokenizer = TapasTokenizer.from_pretrained("google/tapas-base-finetuned-wtq") | |
| tokenizer = AutoTokenizer.from_pretrained("Meena/table-question-answering-tapas") | |
| # Load pretrained model: TAPAS finetuned on WikiTable Questions | |
| # model = TapasForQuestionAnswering.from_pretrained("google/tapas-base-finetuned-wtq") | |
| model = AutoModelForTableQuestionAnswering.from_pretrained("Meena/table-question-answering-tapas") | |
| # Return tokenizer and model | |
| return tokenizer, model | |
| def prepare_inputs(table, queries, tokenizer): | |
| """ | |
| Convert dictionary into data frame and tokenize inputs given queries. | |
| """ | |
| # Prepare inputs | |
| # table = pd.DataFrame.from_dict(data) | |
| # table = netflix_df[['title', 'release_year', 'rating']].astype('str').head(50) | |
| table = table.astype('str').head(100) | |
| inputs = tokenizer(table=table, queries=queries, padding='max_length', return_tensors="pt") | |
| # Return things | |
| return table, inputs | |
| def generate_predictions(inputs, model, tokenizer): | |
| """ | |
| Generate predictions for some tokenized input. | |
| """ | |
| # Generate model results | |
| outputs = model(**inputs) | |
| # Convert logit outputs into predictions for table cells and aggregation operators | |
| predicted_table_cell_coords, predicted_aggregation_operators = tokenizer.convert_logits_to_predictions( | |
| inputs, | |
| outputs.logits.detach(), | |
| outputs.logits_aggregation.detach() | |
| ) | |
| # Return values | |
| return predicted_table_cell_coords, predicted_aggregation_operators | |
| def postprocess_predictions(predicted_aggregation_operators, predicted_table_cell_coords, table): | |
| """ | |
| Compute the predicted operation and nicely structure the answers. | |
| """ | |
| # Process predicted aggregation operators | |
| aggregation_operators = {0: "NONE", 1: "SUM", 2: "AVERAGE", 3:"COUNT"} | |
| aggregation_predictions_string = [aggregation_operators[x] for x in predicted_aggregation_operators] | |
| # Process predicted table cell coordinates | |
| answers = [] | |
| for agg, coordinates in zip(predicted_aggregation_operators, predicted_table_cell_coords): | |
| if len(coordinates) == 1: | |
| # 1 cell | |
| answers.append(table.iat[coordinates[0]]) | |
| else: | |
| # > 1 cell | |
| cell_values = [] | |
| for coordinate in coordinates: | |
| cell_values.append(table.iat[coordinate]) | |
| answers.append(", ".join(cell_values)) | |
| # Return values | |
| return aggregation_predictions_string, answers | |
| def show_answers(queries, answers, aggregation_predictions_string): | |
| """ | |
| Visualize the postprocessed answers. | |
| """ | |
| agg = {"NONE": lambda x: x, "SUM" : lambda x: sum(x), "AVERAGE": lambda x: (sum(x) / len(x)), "COUNT": lambda x: len(x)} | |
| result = '' | |
| for query, answer, predicted_agg in zip(queries, answers, aggregation_predictions_string): | |
| print(query) | |
| if predicted_agg == "NONE": | |
| print("Predicted answer: " + answer) | |
| else: | |
| if all([not p.match(val) == None for val in answer.split(', ')]): | |
| # print("Predicted answer: " + predicted_agg + "(" + answer + ") = " + str(agg[predicted_agg](list(map(float, answer.split(',')))))) | |
| result = "Predicted answer: " + str(agg[predicted_agg](list(map(float, answer.split(','))))) | |
| elif predicted_agg == "COUNT": | |
| # print("Predicted answer: " + predicted_agg + "(" + answer + ") = " + str(agg[predicted_agg](answer.split(',')))) | |
| result = "Predicted answer: " + str(agg[predicted_agg](answer.split(','))) | |
| else: | |
| result = "Predicted answer: " + predicted_agg + " > " + answer | |
| return result | |
| def execute_query(query, table): | |
| """ | |
| Invoke the TAPAS model. | |
| """ | |
| queries = [query] | |
| tokenizer, model = load_model_and_tokenizer() | |
| table, inputs = prepare_inputs(table, queries, tokenizer) | |
| predicted_table_cell_coords, predicted_aggregation_operators = generate_predictions(inputs, model, tokenizer) | |
| aggregation_predictions_string, answers = postprocess_predictions(predicted_aggregation_operators, predicted_table_cell_coords, table) | |
| return show_answers(queries, answers, aggregation_predictions_string) | |