import warnings warnings.filterwarnings('ignore') import streamlit as st import pandas as pd import numpy as np from sklearn.metrics.pairwise import cosine_similarity from transformers import AutoTokenizer, AutoModel import torch from tqdm import tqdm from datasets import load_dataset from datetime import datetime from typing import List, Dict, Any from torch.utils.data import DataLoader, Dataset from functools import partial # Configure GPU if available device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Initialize session state if 'history' not in st.session_state: st.session_state.history = [] if 'feedback' not in st.session_state: st.session_state.feedback = {} # Define subset size SUBSET_SIZE = 1000 # Starting with 1000 items for quick testing class TextDataset(Dataset): def __init__(self, texts: List[str], tokenizer, max_length: int = 512): self.texts = texts self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.texts) def __getitem__(self, idx): return self.tokenizer( self.texts[idx], padding='max_length', truncation=True, max_length=self.max_length, return_tensors="pt" ) @st.cache_resource def load_data_and_model(): """Load the dataset and model with optimized memory usage""" try: # Load dataset dataset = load_dataset("frankjosh/filtered_dataset") data = pd.DataFrame(dataset['train']) # Take a random subset data = data.sample(n=min(SUBSET_SIZE, len(data)), random_state=42).reset_index(drop=True) # Combine text fields data['text'] = data['docstring'].fillna('') + ' ' + data['summary'].fillna('') # Load model and tokenizer model_name = "Salesforce/codet5-small" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModel.from_pretrained(model_name) if torch.cuda.is_available(): model = model.to(device) model.eval() return data, tokenizer, model except Exception as e: st.error(f"Error in initialization: {str(e)}") st.stop() def collate_fn(batch, pad_token_id): max_length = max(inputs['input_ids'].shape[1] for inputs in batch) input_ids = [] attention_mask = [] for inputs in batch: input_ids.append(torch.nn.functional.pad( inputs['input_ids'].squeeze(), (0, max_length - inputs['input_ids'].shape[1]), value=pad_token_id )) attention_mask.append(torch.nn.functional.pad( inputs['attention_mask'].squeeze(), (0, max_length - inputs['attention_mask'].shape[1]), value=0 )) return { 'input_ids': torch.stack(input_ids), 'attention_mask': torch.stack(attention_mask) } def generate_embeddings_batch(model, batch, device): """Generate embeddings for a batch of inputs""" with torch.no_grad(): batch = {k: v.to(device) for k, v in batch.items()} outputs = model.encoder(**batch) embeddings = outputs.last_hidden_state.mean(dim=1) return embeddings.cpu().numpy() def precompute_embeddings(data: pd.DataFrame, model, tokenizer, batch_size: int = 16): """Precompute embeddings with batching and progress tracking""" dataset = TextDataset(data['text'].tolist(), tokenizer) dataloader = DataLoader( dataset, batch_size=batch_size, shuffle=False, collate_fn=partial(collate_fn, pad_token_id=tokenizer.pad_token_id), num_workers=2, # Reduced workers for smaller dataset pin_memory=True ) embeddings = [] total_batches = len(dataloader) # Create a progress bar progress_bar = st.progress(0) status_text = st.empty() start_time = datetime.now() for i, batch in enumerate(dataloader): # Generate embeddings for batch batch_embeddings = generate_embeddings_batch(model, batch, device) embeddings.extend(batch_embeddings) # Update progress progress = (i + 1) / total_batches progress_bar.progress(progress) # Calculate and display ETA elapsed_time = (datetime.now() - start_time).total_seconds() eta = (elapsed_time / (i + 1)) * (total_batches - (i + 1)) status_text.text(f"Processing batch {i+1}/{total_batches}. ETA: {int(eta)} seconds") progress_bar.empty() status_text.empty() # Add embeddings to dataframe data['embedding'] = embeddings return data @torch.no_grad() def generate_query_embedding(model, tokenizer, query: str) -> np.ndarray: """Generate embedding for a single query""" inputs = tokenizer( query, return_tensors="pt", padding=True, truncation=True, max_length=512 ).to(device) outputs = model.encoder(**inputs) embedding = outputs.last_hidden_state.mean(dim=1).cpu().numpy() return embedding.squeeze() def find_similar_repos(query_embedding: np.ndarray, data: pd.DataFrame, top_n: int = 5) -> pd.DataFrame: """Find similar repositories using vectorized operations""" similarities = cosine_similarity([query_embedding], np.stack(data['embedding'].values))[0] data['similarity'] = similarities return data.nlargest(top_n, 'similarity') # Load resources data, tokenizer, model = load_data_and_model() # Add info about subset size st.info(f"Running with a subset of {SUBSET_SIZE} repositories for testing purposes.") # Precompute embeddings for the subset data = precompute_embeddings(data, model, tokenizer) # Main App Interface st.title("Repository Recommender System 🚀") st.caption("Testing Version - Running on subset of data") # Rest of your UI code remains the same... # Main App Interface st.title("Enhanced Repository Recommender System 🚀") # Sidebar for History and Stats with st.sidebar: st.header("📊 Search History") if st.session_state.history: for idx, item in enumerate(st.session_state.history[-5:]): # Show last 5 searches with st.expander(f"Search {len(st.session_state.history)-idx}: {item['query'][:30]}..."): st.write(f"Time: {item['timestamp']}") st.write(f"Results: {len(item['results'])} repositories") if st.button("Rerun this search", key=f"rerun_{idx}"): st.session_state.rerun_query = item['query'] else: st.write("No search history yet") st.header("📈 Usage Statistics") st.write(f"Total Searches: {len(st.session_state.history)}") if st.session_state.feedback: feedback_df = pd.DataFrame(st.session_state.feedback).T feedback_df['Total'] = feedback_df['likes'] + feedback_df['dislikes'] st.bar_chart(feedback_df[['likes', 'dislikes']]) # Main interface user_query = st.text_area( "Describe your project:", height=150, placeholder="Example: I need a machine learning project for customer churn prediction..." ) # Search button and filters col1, col2 = st.columns([2, 1]) with col1: search_button = st.button("🔍 Search Repositories", type="primary") with col2: top_n = st.selectbox("Number of results:", [3, 5, 10], index=1) if search_button and user_query.strip(): with st.spinner("Finding relevant repositories..."): # Generate query embedding and get recommendations query_embedding = generate_embedding(model, tokenizer, user_query) data['similarity'] = data['embedding'].apply( lambda x: cosine_similarity([query_embedding], [x])[0][0] ) recommendations = data.nlargest(top_n, 'similarity') # Save to history st.session_state.history.append({ 'query': user_query, 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'results': recommendations['repo'].tolist() }) # Display recommendations st.markdown("### 🎯 Top Recommendations") for idx, row in recommendations.iterrows(): with st.expander(f"Repository {idx + 1}: {row['repo']}", expanded=True): # Repository details col1, col2 = st.columns([2, 1]) with col1: st.markdown(f"**URL:** [View Repository]({row['url']})") st.markdown(f"**Path:** `{row['path']}`") with col2: st.metric("Match Score", f"{row['similarity']:.2%}") # Feedback buttons feedback_col1, feedback_col2 = st.columns(2) with feedback_col1: if st.button("👍", key=f"like_{idx}"): save_feedback(row['repo'], 'likes') st.success("Thanks for your feedback!") with feedback_col2: if st.button("👎", key=f"dislike_{idx}"): save_feedback(row['repo'], 'dislikes') st.success("Thanks for your feedback!") # Case Study Tab with st.expander("📑 Case Study Brief"): st.markdown(generate_case_study(row)) # Documentation Tab if row['docstring']: with st.expander("📚 Documentation"): st.markdown(row['docstring']) # Footer st.markdown("---") st.markdown( """ Made with 🤖 using CodeT5 and Streamlit | GPU Status: {'🟢 Enabled' if torch.cuda.is_available() else '🔴 Disabled'} | Model: CodeT5-Small """ )