Spaces:
Sleeping
Sleeping
| from controller.imports import * | |
| import logging | |
| from datetime import datetime | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| app.mount('/gamification',gamification) | |
| def get_course(query): | |
| # Example search query | |
| results = google_search(query, API_KEY, CX) | |
| content=[] | |
| if results: | |
| for item in results.get('items', []): | |
| title = item.get('title') | |
| link = item.get('link') | |
| snippet = item.get('snippet') | |
| provider = extract_provider(link) | |
| content_structure={} | |
| content_structure["courseTitle"]=title | |
| content_structure["courseLink"]=link | |
| content_structure["courseSnippet"]= snippet | |
| content_structure["provider"]= provider | |
| content_structure["scrapedCourseDetails"]= scrapeCourse(url=link) | |
| content.append(content_structure) | |
| return JSONResponse(content,status_code=200) | |
| async def upload_file(file: UploadFile = File(...),authorization: str = Header(...)): | |
| # Extract the token from the Authorization header (Bearer token) | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| else: | |
| content = await file.read() # Read the file content (this will return bytes) | |
| sentences=[] | |
| print(f"File name: {file.filename}") | |
| print(f"File content type: {file.content_type}") | |
| print(f"File size: {file.size} bytes") | |
| if "pdf" == file.filename.split('.')[1]: | |
| pdf_document = fitz.open(stream=BytesIO(content), filetype="pdf") | |
| extracted_text = "" | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document.load_page(page_num) | |
| extracted_text += page.get_text() | |
| elif "docx" == file.filename.split('.')[1]: | |
| docx_file = BytesIO(content) | |
| doc = docx.Document(docx_file) | |
| extracted_text = "" | |
| for para in doc.paragraphs: | |
| extracted_text += para.text + "\n" | |
| sentences = split_text_into_chunks(extracted_text,chunk_size=200) | |
| docs = generate_embedding_for_user_resume(data=sentences,user_id=file.filename) | |
| response= insert_embeddings_into_pinecone_database(doc=docs,api_key=PINECONE_API_KEY,name_space=decoded_user_id) | |
| return {"name": file.filename,"response":str(response) } | |
| def ask_ai_about_resume(req:AiAnalysis,authorization: str = Header(...)): | |
| # Retrieve context from your vector database | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| context = query_vector_database(query=req.Query, api_key=PINECONE_API_KEY, name_space=decoded_user_id) | |
| # Ensure that an event loop is present in this thread. | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Create the Gemini client after the event loop is set up | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=f""" | |
| Answer this question using the context provided: | |
| question: {req.Query} | |
| context: {context} | |
| """ | |
| ) | |
| return {"Ai_Response":response.text} | |
| def ask_ai_to_recommnd_courses(request:UserCourse,authorization:str=Header(...)): | |
| """ | |
| """ | |
| # Extract the token from the Authorization header (Bearer token) | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| # Ensure that an event loop is present in this thread. | |
| try: | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # # Create the Gemini client after the event loop is set up | |
| # client = genai.Client(api_key=GEMINI_API_KEY) | |
| # response = client.models.generate_content( | |
| # model="gemini-2.0-flash", | |
| # contents=f""" | |
| # please respond with a JSON object that contains the following keys as a response: | |
| # - "coursename": the name of the recommended course, | |
| # - "completiontime": an estimate of how long it would take to complete the course. | |
| # Do not include any extra text. | |
| # Recommend a course using this information below : | |
| # Which of the following best describes you?: {request.employmentStatus} | |
| # Would you like to prepare for an interim role to gain experience and income while pursuing your dream job?: {request.interimRole} | |
| # What is your desired role?: {request.dreamRole} | |
| # Why do you want to achieve this desired role?: {request.motivation} | |
| # How do you prefer to learn new skills?: {request.learningPreference} | |
| # How many hours per day can you dedicate to learning?: {request.timeCommitmentPerDay} | |
| # What are the biggest challenges or obstacles you face in reaching your dream role?: {request.challenges} | |
| # What is your ideal timeframe for achieving your dream role?: {request.timeframeToAchieveDreamRole} | |
| # """ | |
| # ) | |
| questions=request.model_dump() | |
| questions['userId']=decoded_user_id | |
| create_questionaire(db_uri=MONGO_URI,db_name="crayonics",collection_name="Questionaire",document=questions) | |
| # course_info = extract_course_info(response.text) | |
| # courses = get_course_func(query=course_info.courseName) | |
| return {"courseInfo":"course_info","courses":"courses"} | |
| def login(user:UserBody): | |
| user ={"email":user.email,"password":user.password,"firstName":user.firstName,"lastName":user.lastName} | |
| print(user) | |
| user_id= login_user(db_uri=MONGO_URI,db_name="crayonics",collection_name="users",document=user) | |
| if user_id != False: | |
| refreshToken=create_refreshToken(db_uri=MONGO_URI,user_id=user_id) | |
| accessToken = create_accessToken(db_uri=MONGO_URI,user_id=user_id,refresh_token=refreshToken) | |
| result = update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refreshToken,access_token=accessToken) | |
| print(result) | |
| access_token = encode_jwt(user_id=user_id,access_token=accessToken) | |
| return {"refreshToken":refreshToken,"accessToken":access_token} | |
| return JSONResponse(status_code=401,content={"detail":"Invalid login details"}) | |
| def signUp(user:UserBody): | |
| user ={"email":user.email,"password":user.password,"first_name":user.firstName,"last_name":user.lastName} | |
| user_id= create_user(db_uri=MONGO_URI,db_name="crayonics",collection_name="users",document=user) | |
| if user_id != False: | |
| refreshToken=create_refreshToken(db_uri=MONGO_URI,user_id=user_id) | |
| accessToken = create_accessToken(db_uri=MONGO_URI,user_id=user_id,refresh_token=refreshToken) | |
| result = update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refreshToken,access_token=accessToken) | |
| print(result) | |
| access_token = encode_jwt(user_id=user_id,access_token=accessToken) | |
| return {"refreshToken":refreshToken,"accessToken":access_token} | |
| return JSONResponse(status_code=status.HTTP_226_IM_USED,content="user already Exists") | |
| def logout(refresh:Token,authorization: str = Header(...)): | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
| result = logout_func(db_uri=MONGO_URI,refresh_token= refresh.refreshToken) | |
| if result ==True: | |
| return {"content": f"successful"} | |
| else: | |
| return JSONResponse(status_code=status.HTTP_410_GONE,content={"content": f"unsuccessful"}) | |
| def refresh_access_token(refresh_token:Token, authorization: str = Header(...)): | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_refresh_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token,refresh_token=refresh_token.refreshToken) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| new_access_token = create_accessToken(db_uri=MONGO_URI,user_id=decoded_user_id,refresh_token=refresh_token.refreshToken) | |
| update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refresh_token.refreshToken,access_token=new_access_token) | |
| newly_encoded_access_token = encode_jwt(user_id=decoded_user_id,access_token=new_access_token) | |
| return {"accessToken":newly_encoded_access_token} | |
| def get_user_details(authorization: str = Header(...)): | |
| # Extract the token from the Authorization header (Bearer token) | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| doc = {"user_id":decoded_user_id} | |
| user_info = user_details_func(db_uri=MONGO_URI,document=doc) | |
| return { "userInfo": user_info} | |
| def protected_route(authorization: str = Header(...)): | |
| # Extract the token from the Authorization header (Bearer token) | |
| token = authorization.split("Bearer ")[-1] | |
| decoded_user_id,decoded_access_token = decode_jwt(token) | |
| is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
| if is_valid != True: # Example check | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| return {"message": "Access granted", "verification": "verified"} | |
| def create_leaderboard_ranking( document: LeaderBoardRanking) -> bool: | |
| collection = db['LeaderBoard'] | |
| # Insert the document | |
| result= collection.find_one_and_replace(filter={"userId":document.userId},replacement=document.model_dump()) | |
| print(result) | |
| if result==None: | |
| result = collection.insert_one(document.model_dump()) | |
| print("correctly inserted new document for",document.firstName) | |
| return True | |
| return False | |
| def get_all_users(user_id =None) -> List: | |
| client = MongoClient(MONGO_URI) | |
| db = client.crayonics | |
| collection = db['users'] | |
| # Insert the document | |
| if user_id==None: | |
| results= collection.find() | |
| if results: | |
| result = [result for result in results] | |
| return result | |
| client.close() | |
| else: | |
| result = collection.find_one(filter={"_id":ObjectId(user_id)}) | |
| return result | |
| def get_user_id_from_docKey(dockId): | |
| client = MongoClient(MONGO_URI) | |
| db = client.crayonics | |
| collection = db['Points'] | |
| # Insert the document | |
| result = collection.find_one(filter={"_id":ObjectId(dockId)}) | |
| client.close() | |
| return result['userId'] | |
| client = MongoClient(MONGO_URI) | |
| db = client.crayonics | |
| collection = db['Points'] | |
| def handle_change2(new_point): | |
| logger.info(f"Extra info: {new_point}") | |
| print("No leaderboard so creating one now") | |
| users = get_all_users() | |
| for user in users: | |
| user_id = str(user['_id']) | |
| print("Inserting user", f"User ID: {user_id}") | |
| # Handle points retrieval safely | |
| try: | |
| points = get_all_simple_points_func(userId=user_id) | |
| print("Points:", points) | |
| except Exception as e: | |
| logger.error(f"Error processing points for user {user_id}: {e}") | |
| points = None # Default value to prevent errors | |
| # Handle dream job retrieval safely | |
| tempDreamJob = None | |
| try: | |
| tempDreamJob = get_dream_job(userId=user_id) | |
| except Exception as e: | |
| logger.error(f"Error retrieving dream job for user {user_id}: {e}") | |
| # Assign default value if tempDreamJob is invalid | |
| dreamJob = tempDreamJob if isinstance(tempDreamJob, str) else "IncompleteProfile" | |
| # Try inserting into leaderboard | |
| try: | |
| create_leaderboard_ranking(LeaderBoardRanking( | |
| userId=user_id, | |
| firstName=user.get('first_name', 'Unknown'), # Safer dict access | |
| lastName=user.get('last_name', 'Unknown'), | |
| totalpoints=points.totalpoints if points else 0, # Prevent NoneType error | |
| lastUpdated=datetime.now(), | |
| careerPath=dreamJob, | |
| )) | |
| except Exception as e: | |
| logger.error(f"Error adding user {user_id} to leaderboard: {e}") | |
| def handle_change3(userId:str): | |
| if not userId: | |
| raise ValueError("User ID not found in inserted document") | |
| # Fetch user details | |
| try: | |
| leveleduser = get_all_users(user_id=userId) | |
| except Exception as e: | |
| leveleduser = {} # Default empty dict to prevent KeyError | |
| # Fetch user points | |
| try: | |
| points = get_all_simple_points_func(userId=userId) | |
| except Exception as e: | |
| logger.error(f"Error fetching points for userId {userId}: {e}") | |
| points = SimpleIndividualUserLevel(totalpoints=0) | |
| # Fetch dream job | |
| try: | |
| tempDreamJob = get_dream_job(userId=userId) | |
| print("========================================================") | |
| print("Temp dream job", tempDreamJob) | |
| except Exception as e: | |
| logger.error(f"Error fetching dream job for userId {userId}: {e}") | |
| tempDreamJob = None | |
| dreamJob = tempDreamJob if isinstance(tempDreamJob, str) else "IncompleteProfile" | |
| print("DreamJob", dreamJob) | |
| # Insert into leaderboard | |
| try: | |
| create_leaderboard_ranking(LeaderBoardRanking( | |
| userId=userId, | |
| firstName=leveleduser.get('first_name', 'Unknown'), | |
| lastName=leveleduser.get('last_name', 'Unknown'), | |
| totalpoints=points.totalpoints, | |
| lastUpdated=datetime.now(), | |
| careerPath=dreamJob, | |
| )) | |
| except Exception as e: | |
| logger.error(f"Error adding user {userId} to leaderboard: {e}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error processing change document: {e}") | |
| # A function to handle changes | |
| def handle_change(change=None): | |
| print("Change detected in point making changes immediately") | |
| logger.info(f"Change detected: {change}") | |
| # add everybodies points and add it to the leaderboard table | |
| collections = db.list_collection_names() | |
| if "LeaderBoard" not in collections: | |
| print("No leaderboard so creating one now") | |
| users = get_all_users() | |
| for user in users: | |
| print("inserting user",f"user id {user['_id']}") | |
| points = get_all_simple_points_func(userId=str(user['_id'])) | |
| tempDreamJob = get_dream_job(userId=str(user['_id'])) | |
| dreamJob = tempDreamJob if type(tempDreamJob)==str else "IncompleteProfile" | |
| create_leaderboard_ranking(LeaderBoardRanking(userId=str(user['_id']),firstName=user['first_name'],lastName=user['last_name'],totalpoints=points.totalpoints,lastUpdated=datetime.now(),careerPath=dreamJob,)) | |
| else: | |
| if change['operationType'] == 'insert': | |
| try: | |
| # Extract the full document | |
| full_document = change.get('fullDocument', {}) | |
| user_id = full_document.get('userId') | |
| print("========================================================") | |
| print("Leveled User", user_id) | |
| if not user_id: | |
| raise ValueError("User ID not found in inserted document") | |
| # Fetch user details | |
| try: | |
| leveleduser = get_all_users(user_id=user_id) | |
| print("========================================================") | |
| print("Leveled User", leveleduser) | |
| except Exception as e: | |
| logger.error(f"Error fetching user details for userId {user_id}: {e}") | |
| leveleduser = {} # Default empty dict to prevent KeyError | |
| # Fetch user points | |
| try: | |
| points = get_all_simple_points_func(userId=user_id) | |
| print("========================================================") | |
| print("Points", points) | |
| except Exception as e: | |
| logger.error(f"Error fetching points for userId {user_id}: {e}") | |
| points = SimpleIndividualUserLevel(totalpoints=0) | |
| # Fetch dream job | |
| try: | |
| tempDreamJob = get_dream_job(userId=user_id) | |
| print("========================================================") | |
| print("Temp dream job", tempDreamJob) | |
| except Exception as e: | |
| logger.error(f"Error fetching dream job for userId {user_id}: {e}") | |
| tempDreamJob = None | |
| dreamJob = tempDreamJob if isinstance(tempDreamJob, str) else "IncompleteProfile" | |
| print("DreamJob", dreamJob) | |
| # Insert into leaderboard | |
| try: | |
| create_leaderboard_ranking(LeaderBoardRanking( | |
| userId=user_id, | |
| firstName=leveleduser.get('first_name', 'Unknown'), | |
| lastName=leveleduser.get('last_name', 'Unknown'), | |
| totalpoints=points.totalpoints, | |
| lastUpdated=datetime.now(), | |
| careerPath=dreamJob, | |
| )) | |
| except Exception as e: | |
| logger.error(f"Error adding user {user_id} to leaderboard: {e}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error processing change document: {e}") | |
| elif change['operationType'] == 'update': | |
| try: | |
| # Extract the full document | |
| doc_id = str( change.get('documentKey')['_id']) | |
| print("========================================================") | |
| print("document Id", doc_id) | |
| user_id = get_user_id_from_docKey(doc_id) | |
| if not user_id: | |
| raise ValueError("User ID not found in inserted document") | |
| # Fetch user details | |
| try: | |
| leveleduser = get_all_users(user_id=user_id) | |
| print("========================================================") | |
| print("Leveled User", leveleduser) | |
| except Exception as e: | |
| logger.error(f"Error fetching user details for userId {user_id}: {e}") | |
| leveleduser = {} # Default empty dict to prevent KeyError | |
| # Fetch user points | |
| try: | |
| points = get_all_simple_points_func(userId=user_id) | |
| print("========================================================") | |
| print("Points", points) | |
| except Exception as e: | |
| logger.error(f"Error fetching points for userId {user_id}: {e}") | |
| points = SimpleIndividualUserLevel(totalpoints=0) | |
| # Fetch dream job | |
| try: | |
| tempDreamJob = get_dream_job(userId=user_id) | |
| print("========================================================") | |
| print("Temp dream job", tempDreamJob) | |
| except Exception as e: | |
| logger.error(f"Error fetching dream job for userId {user_id}: {e}") | |
| tempDreamJob = None | |
| dreamJob = tempDreamJob if isinstance(tempDreamJob, str) else "IncompleteProfile" | |
| print("DreamJob", dreamJob) | |
| # Insert into leaderboard | |
| try: | |
| create_leaderboard_ranking(LeaderBoardRanking( | |
| userId=user_id, | |
| firstName=leveleduser.get('first_name', 'Unknown'), | |
| lastName=leveleduser.get('last_name', 'Unknown'), | |
| totalpoints=points.totalpoints, | |
| lastUpdated=datetime.now(), | |
| careerPath=dreamJob, | |
| )) | |
| except Exception as e: | |
| logger.error(f"Error adding user {user_id} to leaderboard: {e}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error processing change document: {e}") | |
| logger.info(f"Change detected:") | |
| def watch_change_stream(): | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Limited to 10 threads, because there are only 10 server worker | |
| with collection.watch() as stream: | |
| for change in stream: | |
| # Submit the handle_change task to the thread pool | |
| executor.submit(handle_change, change) | |
| # Start a background thread to watch the change stream | |
| def start_change_stream(): | |
| threading.Thread(target=watch_change_stream, daemon=True).start() | |