import pandas as pd import numpy as np import torch import torch.optim as optim from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler from torch import cuda from transformers import GPT2LMHeadModel,GPT2Tokenizer, GPT2Config import argparse #from google.colab import drive #drive.mount('/content/drive') device = 'mps' if torch.backends.mps.is_available() else 'cpu' #!pip install datasets ''' from datasets import load_dataset dataset1 = load_dataset("dair-ai/emotion") for split, data in dataset1.items(): data.to_csv(f"emotion_{split}.csv", index = None) ''' def read_reviews(data_path): dataset = pd.DataFrame() for path in data_path: df = pd.read_csv("/content/drive/MyDrive/Text_summary_datasets/"+ path) # Remove null values: df.dropna(inplace=True) # Convert label: if path == "emotion_train.csv": class_mapping = {0:'sad', 1: 'joy', 2: 'love', 3: 'anger', 4: 'fear', 5: 'surprise'} # Replace the numerical/categorical values with words using the mapping df['Summary'] = df['label'].replace(class_mapping) df['training'] = df['text'] + 'TL;DR' + df['Summary'] df['Text'] = df['text'] if path == "amazon_review.csv": df['training'] = df['Text'] + 'TL;DR' + df['Summary'] if path == "kindle_review.csv": df['training'] = df['reviewText'] + 'TL;DR' + df['summary'] df['Text'] = df['reviewText'] df['Summary'] = df['summary'] if path == "tweet_train.csv": df['training'] = df['content'] + 'TL;DR' + df['c_summary'] df['Text'] = df['content'] df['Summary'] = df['c_summary'] sampled_data = df.sample(n=1250, random_state=42) dataset = dataset.append(sampled_data, ignore_index=True) # Combining the two columns review and summary: #df['training'] = df['text'] + 'TL;DR' + df['Summary'] dataset = dataset[['Summary','Text','training']] return dataset #reviews.head(1800) class GPT2ReviewDataset(Dataset): def __init__(self, tokenizer, reviews, max_len): self.max_len = max_len self.tokenizer = tokenizer self.eos = self.tokenizer.eos_token self.eos_id = self.tokenizer.eos_token_id self.reviews = reviews self.result = [] for review in self.reviews: # Encode the text using tokenizer.encode(). We add EOS at the end tokenized = self.tokenizer.encode(review + self.eos, max_length = 512, truncation = True) # Padding/truncating the encoded sequence to max_len padded = self.pad_truncate(tokenized) # Creating a tensor and adding to the result self.result.append(torch.tensor(padded)) def __len__(self): return len(self.result) def __getitem__(self, item): return self.result[item] def pad_truncate(self, name): extra_length = 4 name_length = len(name) - extra_length if name_length < self.max_len: difference = self.max_len - name_length result = name + [self.eos_id] * difference elif name_length > self.max_len: result = name[:self.max_len + 3]+[self.eos_id] else: result = name return result def train(model, optimizer, dl, epochs): for epoch in range(epochs): for idx, batch in enumerate(dl): print(idx) with torch.set_grad_enabled(True): optimizer.zero_grad() batch = batch.to(device) output = model(batch, labels=batch) loss = output[0] loss.backward() optimizer.step() torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth') if idx % 50 == 0: print("loss: %f, %d"%(loss, idx)) def main(): data_path = ["emotion_train.csv","kindle_review.csv", "amazon_review.csv", "tweet_train.csv"] reviews = read_reviews(data_path) model = GPT2LMHeadModel.from_pretrained('gpt2') #model = torch.load('/content/drive/MyDrive/text_summary.pth') config = GPT2Config.from_pretrained("gpt2") model.config = config tokenizer = GPT2Tokenizer.from_pretrained("gpt2") extra_length = len(tokenizer.encode(" TL;DR ")) max_length = 250 optimizer = optim.Adam(params = model.parameters(), lr=3e-4) dataset = GPT2ReviewDataset(tokenizer, reviews['training'], max_len = max_length) dataloader = DataLoader(dataset, batch_size=8, shuffle=True) train(model=model, optimizer=optimizer, dl=dataloader, epochs=3) torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth') def topk(probs, n=9): # The scores are initially softmaxed to convert to probabilities probs = torch.softmax(probs, dim= -1) # PyTorch has its own topk method, which we use here tokensProb, topIx = torch.topk(probs, k=n) # The new selection pool (9 choices) is normalized tokensProb = tokensProb / torch.sum(tokensProb) # Send to CPU for numpy handling tokensProb = tokensProb.cpu().detach().numpy() # Make a random choice from the pool based on the new prob distribution choice = np.random.choice(n, 1, p = tokensProb)#[np.argmax(tokensProb)]# tokenId = topIx[choice][0] return int(tokenId) def model_infer(model, tokenizer, review, max_length=30): # Preprocess the init token (task designator) review_encoded = tokenizer.encode(review) result = review_encoded initial_input = torch.tensor(review_encoded).unsqueeze(0).to(device) with torch.set_grad_enabled(False): # Feed the init token to the model output = model(initial_input) # Flatten the logits at the final time step logits = output.logits[0,-1] # Make a top-k choice and append to the result #choices = [topk(logits) for i in range(5)] choices = topk(logits) result.append(choices) # For max_length times: for _ in range(max_length): # Feed the current sequence to the model and make a choice input = torch.tensor(result).unsqueeze(0).to(device) output = model(input) logits = output.logits[0,-1] res_id = topk(logits) # If the chosen token is EOS, return the result if res_id == tokenizer.eos_token_id: return tokenizer.decode(result) else: # Append to the sequence result.append(res_id) # IF no EOS is generated, return after the max_len return tokenizer.decode(result) def interface(input): dataset_sample = False tokenizer = GPT2Tokenizer.from_pretrained("gpt2") model = torch.load('text_summary_4sets_2_550.pth', map_location=torch.device('mps')) if dataset_sample: sample_reviews = reviews['training'].sample(n=1, random_state=1) summary = [model_infer(model, tokenizer, review).strip() for review in sample_reviews] else: result_text = [] for i in range(6): summary = model_infer(model, tokenizer, input+"TL;DR").strip() result_text.append(summary[len(input)+5:]) #print(sorted(result_text, key=len)) print("summary:", sorted(result_text, key=len)[3]) ''' sample = 'Today was a hard day. I woke up feeling anxious and stressed about a meeting I had at work. The meeting did not go as I had hoped and I left disappointed. I tried to focus on other things and stay positive, but it was hard. I spent most of the evening starving and eating junk food. Not the best way to deal with my emotions, but it’s something I’m working on. Hope tomorrow will be a better day.TL;DR' summary = model_infer(model, tokenizer, sample).strip() sample summary[len(sample):] sample = 'Today was much better than yesterday. I wake up feeling more rested and ready to tackle the day. I had a productive day at work and even managed to finish a project I was struggling with. After work, I met some friends for a yoga class and it was just what I needed to relax and unwind. We went out for dinner afterwards and had a really nice time. Overall, it was a much better day than yesterday and I feel more positive about things.TL;DR' summary = model_infer(model, tokenizer, sample).strip() summary[len(sample):] sample = 'Today was a beautiful day. I had a good night’s sleep and was ready to start the day. I went to work and had a productive morning. I even managed to finish a project I’d been working on for weeks. After work, I ran to clear my head. It was a beautiful day and the weather was perfect for it. I came home and cooked dinner with my partner. We had a nice conversation over dinner and then spent the evening watching a movie. Overall, it was a pretty relaxing and enjoyable day.' summary = model_infer(model, tokenizer, sample + 'TL;DR').strip() summary[len(sample)+5:] ''' if __name__ == '__main__': parser = argparse.ArgumentParser(description= "parser") # Add command-line arguments parser.add_argument("--train", action="store_true", help="Train the model") parser.add_argument("--infer", type=str, help="Interact with the model") # Parse the command-line arguments args = parser.parse_args() # Check which argument was provided and call the corresponding function if args.train: main() elif args.infer: interface(args.infer) else: print("No valid option provided. Use --train or --infer.")