text-summary-gpt2-short / text_summary.py
Lin0He's picture
Upload 2 files
74353d2
raw
history blame
9.66 kB
import pandas as pd
import numpy as np
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from torch import cuda
from transformers import GPT2LMHeadModel,GPT2Tokenizer, GPT2Config
import argparse
#from google.colab import drive
#drive.mount('/content/drive')
device = 'mps' if torch.backends.mps.is_available() else 'cpu'
#!pip install datasets
'''
from datasets import load_dataset
dataset1 = load_dataset("dair-ai/emotion")
for split, data in dataset1.items():
data.to_csv(f"emotion_{split}.csv", index = None)
'''
def read_reviews(data_path):
dataset = pd.DataFrame()
for path in data_path:
df = pd.read_csv("/content/drive/MyDrive/Text_summary_datasets/"+ path)
# Remove null values:
df.dropna(inplace=True)
# Convert label:
if path == "emotion_train.csv":
class_mapping = {0:'sad', 1: 'joy', 2: 'love', 3: 'anger', 4: 'fear', 5: 'surprise'}
# Replace the numerical/categorical values with words using the mapping
df['Summary'] = df['label'].replace(class_mapping)
df['training'] = df['text'] + 'TL;DR' + df['Summary']
df['Text'] = df['text']
if path == "amazon_review.csv":
df['training'] = df['Text'] + 'TL;DR' + df['Summary']
if path == "kindle_review.csv":
df['training'] = df['reviewText'] + 'TL;DR' + df['summary']
df['Text'] = df['reviewText']
df['Summary'] = df['summary']
if path == "tweet_train.csv":
df['training'] = df['content'] + 'TL;DR' + df['c_summary']
df['Text'] = df['content']
df['Summary'] = df['c_summary']
sampled_data = df.sample(n=1250, random_state=42)
dataset = dataset.append(sampled_data, ignore_index=True)
# Combining the two columns review and summary:
#df['training'] = df['text'] + 'TL;DR' + df['Summary']
dataset = dataset[['Summary','Text','training']]
return dataset
#reviews.head(1800)
class GPT2ReviewDataset(Dataset):
def __init__(self, tokenizer, reviews, max_len):
self.max_len = max_len
self.tokenizer = tokenizer
self.eos = self.tokenizer.eos_token
self.eos_id = self.tokenizer.eos_token_id
self.reviews = reviews
self.result = []
for review in self.reviews:
# Encode the text using tokenizer.encode(). We add EOS at the end
tokenized = self.tokenizer.encode(review + self.eos, max_length = 512, truncation = True)
# Padding/truncating the encoded sequence to max_len
padded = self.pad_truncate(tokenized)
# Creating a tensor and adding to the result
self.result.append(torch.tensor(padded))
def __len__(self):
return len(self.result)
def __getitem__(self, item):
return self.result[item]
def pad_truncate(self, name):
extra_length = 4
name_length = len(name) - extra_length
if name_length < self.max_len:
difference = self.max_len - name_length
result = name + [self.eos_id] * difference
elif name_length > self.max_len:
result = name[:self.max_len + 3]+[self.eos_id]
else:
result = name
return result
def train(model, optimizer, dl, epochs):
for epoch in range(epochs):
for idx, batch in enumerate(dl):
print(idx)
with torch.set_grad_enabled(True):
optimizer.zero_grad()
batch = batch.to(device)
output = model(batch, labels=batch)
loss = output[0]
loss.backward()
optimizer.step()
torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth')
if idx % 50 == 0:
print("loss: %f, %d"%(loss, idx))
def main():
data_path = ["emotion_train.csv","kindle_review.csv", "amazon_review.csv", "tweet_train.csv"]
reviews = read_reviews(data_path)
model = GPT2LMHeadModel.from_pretrained('gpt2')
#model = torch.load('/content/drive/MyDrive/text_summary.pth')
config = GPT2Config.from_pretrained("gpt2")
model.config = config
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
extra_length = len(tokenizer.encode(" TL;DR "))
max_length = 250
optimizer = optim.Adam(params = model.parameters(), lr=3e-4)
dataset = GPT2ReviewDataset(tokenizer, reviews['training'], max_len = max_length)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
train(model=model, optimizer=optimizer, dl=dataloader, epochs=3)
torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth')
def topk(probs, n=9):
# The scores are initially softmaxed to convert to probabilities
probs = torch.softmax(probs, dim= -1)
# PyTorch has its own topk method, which we use here
tokensProb, topIx = torch.topk(probs, k=n)
# The new selection pool (9 choices) is normalized
tokensProb = tokensProb / torch.sum(tokensProb)
# Send to CPU for numpy handling
tokensProb = tokensProb.cpu().detach().numpy()
# Make a random choice from the pool based on the new prob distribution
choice = np.random.choice(n, 1, p = tokensProb)#[np.argmax(tokensProb)]#
tokenId = topIx[choice][0]
return int(tokenId)
def model_infer(model, tokenizer, review, max_length=30):
# Preprocess the init token (task designator)
review_encoded = tokenizer.encode(review)
result = review_encoded
initial_input = torch.tensor(review_encoded).unsqueeze(0).to(device)
with torch.set_grad_enabled(False):
# Feed the init token to the model
output = model(initial_input)
# Flatten the logits at the final time step
logits = output.logits[0,-1]
# Make a top-k choice and append to the result
#choices = [topk(logits) for i in range(5)]
choices = topk(logits)
result.append(choices)
# For max_length times:
for _ in range(max_length):
# Feed the current sequence to the model and make a choice
input = torch.tensor(result).unsqueeze(0).to(device)
output = model(input)
logits = output.logits[0,-1]
res_id = topk(logits)
# If the chosen token is EOS, return the result
if res_id == tokenizer.eos_token_id:
return tokenizer.decode(result)
else: # Append to the sequence
result.append(res_id)
# IF no EOS is generated, return after the max_len
return tokenizer.decode(result)
def interface(input):
dataset_sample = False
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
model = torch.load('text_summary_4sets_2_550.pth', map_location=torch.device('mps'))
if dataset_sample:
sample_reviews = reviews['training'].sample(n=1, random_state=1)
summary = [model_infer(model, tokenizer, review).strip() for review in sample_reviews]
else:
result_text = []
for i in range(6):
summary = model_infer(model, tokenizer, input+"TL;DR").strip()
result_text.append(summary[len(input)+5:])
#print(sorted(result_text, key=len))
print("summary:", sorted(result_text, key=len)[3])
'''
sample = 'Today was a hard day. I woke up feeling anxious and stressed about a meeting I had at work. The meeting did not go as I had hoped and I left disappointed. I tried to focus on other things and stay positive, but it was hard. I spent most of the evening starving and eating junk food. Not the best way to deal with my emotions, but it’s something I’m working on. Hope tomorrow will be a better day.TL;DR'
summary = model_infer(model, tokenizer, sample).strip()
sample
summary[len(sample):]
sample = 'Today was much better than yesterday. I wake up feeling more rested and ready to tackle the day. I had a productive day at work and even managed to finish a project I was struggling with. After work, I met some friends for a yoga class and it was just what I needed to relax and unwind. We went out for dinner afterwards and had a really nice time. Overall, it was a much better day than yesterday and I feel more positive about things.TL;DR'
summary = model_infer(model, tokenizer, sample).strip()
summary[len(sample):]
sample = 'Today was a beautiful day. I had a good night’s sleep and was ready to start the day. I went to work and had a productive morning. I even managed to finish a project I’d been working on for weeks. After work, I ran to clear my head. It was a beautiful day and the weather was perfect for it. I came home and cooked dinner with my partner. We had a nice conversation over dinner and then spent the evening watching a movie. Overall, it was a pretty relaxing and enjoyable day.'
summary = model_infer(model, tokenizer, sample + 'TL;DR').strip()
summary[len(sample)+5:]
'''
if __name__ == '__main__':
parser = argparse.ArgumentParser(description= "parser")
# Add command-line arguments
parser.add_argument("--train", action="store_true", help="Train the model")
parser.add_argument("--infer", type=str, help="Interact with the model")
# Parse the command-line arguments
args = parser.parse_args()
# Check which argument was provided and call the corresponding function
if args.train:
main()
elif args.infer:
interface(args.infer)
else:
print("No valid option provided. Use --train or --infer.")