|
import pandas as pd |
|
import numpy as np |
|
import torch |
|
import torch.optim as optim |
|
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler |
|
from torch import cuda |
|
from transformers import GPT2LMHeadModel,GPT2Tokenizer, GPT2Config |
|
import argparse |
|
|
|
|
|
|
|
|
|
|
|
device = 'mps' if torch.backends.mps.is_available() else 'cpu' |
|
|
|
|
|
|
|
''' |
|
from datasets import load_dataset |
|
dataset1 = load_dataset("dair-ai/emotion") |
|
for split, data in dataset1.items(): |
|
data.to_csv(f"emotion_{split}.csv", index = None) |
|
''' |
|
|
|
def read_reviews(data_path): |
|
dataset = pd.DataFrame() |
|
for path in data_path: |
|
df = pd.read_csv("/content/drive/MyDrive/Text_summary_datasets/"+ path) |
|
|
|
df.dropna(inplace=True) |
|
|
|
if path == "emotion_train.csv": |
|
class_mapping = {0:'sad', 1: 'joy', 2: 'love', 3: 'anger', 4: 'fear', 5: 'surprise'} |
|
|
|
df['Summary'] = df['label'].replace(class_mapping) |
|
df['training'] = df['text'] + 'TL;DR' + df['Summary'] |
|
df['Text'] = df['text'] |
|
if path == "amazon_review.csv": |
|
df['training'] = df['Text'] + 'TL;DR' + df['Summary'] |
|
if path == "kindle_review.csv": |
|
df['training'] = df['reviewText'] + 'TL;DR' + df['summary'] |
|
df['Text'] = df['reviewText'] |
|
df['Summary'] = df['summary'] |
|
if path == "tweet_train.csv": |
|
df['training'] = df['content'] + 'TL;DR' + df['c_summary'] |
|
df['Text'] = df['content'] |
|
df['Summary'] = df['c_summary'] |
|
|
|
sampled_data = df.sample(n=1250, random_state=42) |
|
dataset = dataset.append(sampled_data, ignore_index=True) |
|
|
|
|
|
|
|
dataset = dataset[['Summary','Text','training']] |
|
return dataset |
|
|
|
|
|
|
|
|
|
|
|
class GPT2ReviewDataset(Dataset): |
|
def __init__(self, tokenizer, reviews, max_len): |
|
self.max_len = max_len |
|
self.tokenizer = tokenizer |
|
self.eos = self.tokenizer.eos_token |
|
self.eos_id = self.tokenizer.eos_token_id |
|
self.reviews = reviews |
|
self.result = [] |
|
|
|
for review in self.reviews: |
|
|
|
tokenized = self.tokenizer.encode(review + self.eos, max_length = 512, truncation = True) |
|
|
|
|
|
padded = self.pad_truncate(tokenized) |
|
|
|
|
|
self.result.append(torch.tensor(padded)) |
|
|
|
def __len__(self): |
|
return len(self.result) |
|
|
|
|
|
def __getitem__(self, item): |
|
return self.result[item] |
|
|
|
def pad_truncate(self, name): |
|
extra_length = 4 |
|
name_length = len(name) - extra_length |
|
if name_length < self.max_len: |
|
difference = self.max_len - name_length |
|
result = name + [self.eos_id] * difference |
|
elif name_length > self.max_len: |
|
result = name[:self.max_len + 3]+[self.eos_id] |
|
else: |
|
result = name |
|
return result |
|
|
|
def train(model, optimizer, dl, epochs): |
|
for epoch in range(epochs): |
|
for idx, batch in enumerate(dl): |
|
print(idx) |
|
with torch.set_grad_enabled(True): |
|
optimizer.zero_grad() |
|
batch = batch.to(device) |
|
output = model(batch, labels=batch) |
|
loss = output[0] |
|
loss.backward() |
|
optimizer.step() |
|
torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth') |
|
if idx % 50 == 0: |
|
print("loss: %f, %d"%(loss, idx)) |
|
|
|
def main(): |
|
data_path = ["emotion_train.csv","kindle_review.csv", "amazon_review.csv", "tweet_train.csv"] |
|
reviews = read_reviews(data_path) |
|
|
|
model = GPT2LMHeadModel.from_pretrained('gpt2') |
|
|
|
config = GPT2Config.from_pretrained("gpt2") |
|
model.config = config |
|
|
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2") |
|
extra_length = len(tokenizer.encode(" TL;DR ")) |
|
max_length = 250 |
|
optimizer = optim.Adam(params = model.parameters(), lr=3e-4) |
|
|
|
dataset = GPT2ReviewDataset(tokenizer, reviews['training'], max_len = max_length) |
|
dataloader = DataLoader(dataset, batch_size=8, shuffle=True) |
|
|
|
train(model=model, optimizer=optimizer, dl=dataloader, epochs=3) |
|
|
|
torch.save(model, '/content/drive/MyDrive/Text_summary_datasets/text_summary_4sets.pth') |
|
|
|
|
|
def topk(probs, n=9): |
|
|
|
probs = torch.softmax(probs, dim= -1) |
|
|
|
|
|
tokensProb, topIx = torch.topk(probs, k=n) |
|
|
|
|
|
tokensProb = tokensProb / torch.sum(tokensProb) |
|
|
|
|
|
tokensProb = tokensProb.cpu().detach().numpy() |
|
|
|
|
|
choice = np.random.choice(n, 1, p = tokensProb) |
|
tokenId = topIx[choice][0] |
|
|
|
return int(tokenId) |
|
|
|
def model_infer(model, tokenizer, review, max_length=30): |
|
|
|
review_encoded = tokenizer.encode(review) |
|
result = review_encoded |
|
initial_input = torch.tensor(review_encoded).unsqueeze(0).to(device) |
|
|
|
with torch.set_grad_enabled(False): |
|
|
|
output = model(initial_input) |
|
|
|
|
|
logits = output.logits[0,-1] |
|
|
|
|
|
|
|
choices = topk(logits) |
|
result.append(choices) |
|
|
|
|
|
for _ in range(max_length): |
|
|
|
input = torch.tensor(result).unsqueeze(0).to(device) |
|
output = model(input) |
|
logits = output.logits[0,-1] |
|
res_id = topk(logits) |
|
|
|
|
|
if res_id == tokenizer.eos_token_id: |
|
return tokenizer.decode(result) |
|
else: |
|
result.append(res_id) |
|
|
|
|
|
return tokenizer.decode(result) |
|
|
|
def interface(input): |
|
dataset_sample = False |
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2") |
|
model = torch.load('text_summary_4sets_2_550.pth', map_location=torch.device('mps')) |
|
if dataset_sample: |
|
sample_reviews = reviews['training'].sample(n=1, random_state=1) |
|
summary = [model_infer(model, tokenizer, review).strip() for review in sample_reviews] |
|
|
|
else: |
|
result_text = [] |
|
for i in range(6): |
|
summary = model_infer(model, tokenizer, input+"TL;DR").strip() |
|
result_text.append(summary[len(input)+5:]) |
|
|
|
print("summary:", sorted(result_text, key=len)[3]) |
|
|
|
''' |
|
|
|
|
|
sample = 'Today was a hard day. I woke up feeling anxious and stressed about a meeting I had at work. The meeting did not go as I had hoped and I left disappointed. I tried to focus on other things and stay positive, but it was hard. I spent most of the evening starving and eating junk food. Not the best way to deal with my emotions, but it’s something I’m working on. Hope tomorrow will be a better day.TL;DR' |
|
|
|
summary = model_infer(model, tokenizer, sample).strip() |
|
|
|
sample |
|
|
|
summary[len(sample):] |
|
|
|
sample = 'Today was much better than yesterday. I wake up feeling more rested and ready to tackle the day. I had a productive day at work and even managed to finish a project I was struggling with. After work, I met some friends for a yoga class and it was just what I needed to relax and unwind. We went out for dinner afterwards and had a really nice time. Overall, it was a much better day than yesterday and I feel more positive about things.TL;DR' |
|
|
|
summary = model_infer(model, tokenizer, sample).strip() |
|
|
|
summary[len(sample):] |
|
|
|
sample = 'Today was a beautiful day. I had a good night’s sleep and was ready to start the day. I went to work and had a productive morning. I even managed to finish a project I’d been working on for weeks. After work, I ran to clear my head. It was a beautiful day and the weather was perfect for it. I came home and cooked dinner with my partner. We had a nice conversation over dinner and then spent the evening watching a movie. Overall, it was a pretty relaxing and enjoyable day.' |
|
|
|
summary = model_infer(model, tokenizer, sample + 'TL;DR').strip() |
|
summary[len(sample)+5:] |
|
''' |
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser(description= "parser") |
|
|
|
|
|
parser.add_argument("--train", action="store_true", help="Train the model") |
|
parser.add_argument("--infer", type=str, help="Interact with the model") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.train: |
|
main() |
|
elif args.infer: |
|
interface(args.infer) |
|
else: |
|
print("No valid option provided. Use --train or --infer.") |
|
|