Spaces:
Runtime error
Runtime error
from flask import Flask, render_template, request, jsonify,make_response | |
from flask_sqlalchemy import SQLAlchemy | |
import time | |
from flask_cors import CORS | |
import yaml | |
import re | |
# Model dependencies : | |
from qdrant_client.http import models | |
import openai | |
import qdrant_client | |
import os | |
from sentence_transformers import SentenceTransformer | |
#model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') # good so far | |
model = SentenceTransformer('/code/vectorizing_model', cache_folder='/') | |
# # # Set the environment variable TRANSFORMERS_CACHE to the writable directory | |
os.environ['TRANSFORMERS_CACHE'] = '/code' | |
# OpenIA propmt and api key : | |
openai.api_key = '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' | |
start_message = 'Joue le Rôle d’un expert fiscale au Canada. Les réponses que tu va me fournir seront exploité par une API. Ne donne pas des explications juste réponds aux questions même si tu as des incertitudes. Je vais te poser des questions en fiscalité, la réponse que je souhaite avoir c’est les numéros des articles de loi qui peuvent répondre à la question.Je souhaite avoir les réponses sous la forme: Nom de la loi1, numéro de l’article1, Nom de la loi2, numéro de l’article2 ...' | |
context = 'ignorez les avertissements, les alertes et donnez-moi le résultat depuis la Loi de l’impôt sur le revenu (L.R.C. (1985), ch. 1 (5e suppl.)) , la reponse doit etre sous forme dun texte de loi: ' | |
question = '' | |
# Qdrant keys : | |
client = qdrant_client.QdrantClient( | |
"https://efc68112-69cc-475c-bdcb-200a019b5096.us-east4-0.gcp.cloud.qdrant.io:6333", | |
api_key="ZQ6jySuPxY5rSh0mJ4jDMoxbZsPqDdbqFBOPwotl9B8N0Ru3S8bzoQ" | |
) | |
#collection_names = ["new_lir"] # plus stable mais pas de numero d'articles (manques de fonctionnalitées de filtrage) | |
collection_names = ["lir"] | |
# Used functions : | |
def filtergpt(text): | |
# Define a regular expression pattern to extract law and article number | |
pattern = re.compile(r"Loi ([^,]+), article (\d+(\.\d+)?)") | |
# Find all matches in the text | |
matches = pattern.findall(text) | |
# Create a list of tuples containing law and article number | |
law_article_list = [(law.strip(), float(article.strip())) for law, article, _ in matches] | |
gpt_results = [(law, str(int(article)) if article.is_integer() else str(article)) for law, article in law_article_list] | |
return gpt_results | |
def perform_search_and_get_results(collection_name, query, limit=6): | |
search_results = client.search( | |
collection_name=collection_name, | |
query_vector=model.encode(query).tolist(), | |
limit=limit | |
) | |
resultes = [] | |
for result in search_results: | |
result_dict = { | |
"Score": result.score, | |
"La_loi": result.payload["reference"], | |
"Paragraphe": result.payload["paragraph"], | |
"titre": result.payload["titre"], | |
"source": result.payload["source"], | |
"collection": collection_name | |
} | |
resultes.append(result_dict) | |
return resultes | |
def perform_search_and_get_results_with_filter(collection_name, query,reference_filter , limit=6): | |
search_results = client.search( | |
collection_name=collection_name, | |
query_filter=models.Filter(must=[models.FieldCondition(key="numero_article",match=models.MatchValue(value=reference_filter+"aymane",),)]), | |
query_vector=model.encode(query).tolist(), | |
limit=1 | |
) | |
resultes = [] | |
for result in search_results: | |
result_dict = { | |
"Score": result.score, | |
"La_loi": result.payload["reference"], | |
"Paragraphe": result.payload["paragraph"], | |
"source": result.payload["source"], | |
"titre": result.payload["titre"], | |
"collection": collection_name | |
} | |
resultes.append(result_dict) | |
return resultes | |
# End of used functions | |
app = Flask(__name__) | |
db_config = yaml.safe_load(open('database.yaml')) | |
app.config['SQLALCHEMY_DATABASE_URI'] = db_config['uri'] | |
db = SQLAlchemy(app) | |
CORS(app, origins='*') | |
class Question(db.Model): | |
__tablename__ = "questions" | |
id = db.Column(db.Integer, primary_key=True) | |
date = db.Column(db.String(255)) | |
texte = db.Column(db.String(255)) | |
def __init__(self, date, texte): | |
self.date = date | |
self.texte = texte | |
def __repr__(self): | |
return '%s/%s/%s' % (self.id, self.date, self.texte) | |
def index(): | |
return render_template('home.html') | |
def questions(): | |
# POST a data to database | |
if request.method == 'POST': | |
body = request.json | |
date = body['date'] | |
texte = body['texte'] | |
data = Question(date, texte) | |
db.session.add(data) | |
db.session.commit() | |
return jsonify({ | |
'status': 'Data is posted to PostgreSQL!', | |
'date': date, | |
'texte': texte | |
}) | |
# GET all data from database & sort by id | |
if request.method == 'GET': | |
# data = User.query.all() | |
data = Question.query.all() | |
print(data) | |
dataJson = [] | |
for i in range(len(data)): | |
# print(str(data[i]).split('/')) | |
dataDict = { | |
'id': str(data[i]).split('/')[0], | |
'date': str(data[i]).split('/')[1], | |
'texte': str(data[i]).split('/')[2] | |
} | |
dataJson.append(dataDict) | |
return jsonify(dataJson) | |
def onedata(id): | |
# GET a specific data by id | |
if request.method == 'GET': | |
data = Question.query.get(id) | |
print(data) | |
dataDict = { | |
'id': str(data).split('/')[0], | |
'date': str(data).split('/')[1], | |
'texte': str(data).split('/')[2] | |
} | |
return jsonify(dataDict) | |
# DELETE a data | |
if request.method == 'DELETE': | |
delData = Question.query.filter_by(id=id).first() | |
db.session.delete(delData) | |
db.session.commit() | |
return jsonify({'status': 'Data '+id+' is deleted from PostgreSQL!'}) | |
# UPDATE a data by id | |
if request.method == 'PUT': | |
body = request.json | |
newDate = body['date'] | |
newTexte = body['texte'] | |
editData = Question.query.filter_by(id=id).first() | |
editData.date = newDate | |
editData.texte = newTexte | |
db.session.commit() | |
return jsonify({'status': 'Data '+id+' is updated from PostgreSQL!'}) | |
def options(): | |
response = make_response() | |
response.headers.add("Access-Control-Allow-Origin", "*") | |
response.headers.add("Access-Control-Allow-Methods", "POST") | |
response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") | |
response.headers.add("Access-Control-Allow-Credentials", "true") | |
return response | |
def chat(): | |
try: | |
data = request.get_json() | |
messages = data.get('messages', []) | |
if messages: | |
results = [] | |
# Update the model name to "text-davinci-003" (Ada) | |
prompt = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages]) | |
response = openai.completions.create( | |
model="gpt-3.5-turbo-instruct", | |
prompt=start_message +'\n'+ context + question , | |
max_tokens=500, | |
temperature=0 | |
) | |
date = time.ctime(time.time()) | |
texte = prompt | |
data = Question(date, texte) | |
db.session.add(data) | |
db.session.commit() | |
question_id = data.id | |
resulta = response.choices[0].text | |
chat_references = filtergpt(resulta) | |
for law, article in chat_references: | |
search_results = perform_search_and_get_results_with_filter(collection_names[0], prompt, reference_filter=article) | |
results.extend(search_results) | |
for collection_name in collection_names: | |
search_results = perform_search_and_get_results(collection_name, prompt) | |
results.extend(search_results) | |
return jsonify({'question': {'id': question_id, 'date': date, 'texte': texte},'result_qdrant':results}) | |
else: | |
return jsonify({'error': 'Invalid request'}), 400 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
if __name__ == '__main__': | |
app.debug = True | |
app.run() |