Spaces:
Runtime error
Runtime error
import os | |
import re | |
import numpy as np | |
import openai | |
import pandas as pd | |
from sklearn.preprocessing import MinMaxScaler | |
from statsforecast import StatsForecast | |
from statsforecast.models import Naive | |
openai.api_key = os.environ['OPENAI_API_KEY'] | |
class ChatGPTForecast: | |
def __init__(self): | |
self.bins = np.linspace(0, 1, num=10_000) # Create 1000 bins between -10 and 10 | |
self.mapping = {i: f"{i}" for i in range(len(self.bins))} | |
self.prompt = f""" | |
forecast this series, | |
(i know that you prefer using specific tools, but i'm testing something, | |
just give me your predicted numbers please, just print the numbers i dont need an explanation) | |
please consider: | |
- give the output with the same structure: "number1 number2 number3" | |
- give more weight to the most recent observations | |
- consider trend | |
- consider seasonality | |
- values should lie between 0 and {len(self.bins) - 1}, please be sure to do this | |
""" | |
def tokenize_time_series(self, series): | |
indices = np.digitize(series, self.bins) - 1 # Find which bin each data point falls into | |
return ' '.join(self.mapping[i] for i in indices) | |
def clean_string(self, s): | |
pattern = r'(\d+)[^\s]*' | |
# Extract the bin_# parts and join them with space | |
cleaned = ' '.join(re.findall(pattern, s)) | |
return cleaned | |
def extend_string(self, s, h): | |
# Find all bin_# elements | |
bin_numbers = re.findall(r'\d+', s) | |
# Calculate current length | |
current_length = len(bin_numbers) | |
# If the string is already of length h, return as is | |
if current_length == h: | |
return s | |
# If the string length exceeds h, trim the string | |
elif current_length > h: | |
bin_numbers = bin_numbers[:h] | |
return ' '.join(bin_numbers) | |
else: | |
# Calculate how many full repeats we need | |
repeats = h // current_length | |
# If h is not a multiple of current_length, calculate how many more elements we need | |
extra = h % current_length | |
# Create the new string by repeating the original string and adding any extra elements | |
new_string = ' '.join(bin_numbers * repeats + bin_numbers[:extra]) | |
return new_string | |
def clean_gpt_output(self, output): | |
# Remove extra spaces and trailing underscores | |
cleaned_output = output.replace(" _", "_").replace("_ ", "_") | |
# Trim any trailing underscore | |
if cleaned_output.endswith("_"): | |
cleaned_output = cleaned_output[:-1] | |
return self.clean_string(cleaned_output) | |
def decode_time_series(self, tokens): | |
# Reverse the mapping | |
reverse_mapping = {v: k for k, v in self.mapping.items()} | |
# Split the token string into individual tokens and map them back to bin indices | |
indices = [int(token) for token in tokens.split()]#[reverse_mapping[token] for token in tokens.split()] | |
# Convert bin indices back to the original values | |
# Here we'll use the center point of each bin | |
bin_width = self.bins[1] - self.bins[0] | |
series = [self.bins[i] + bin_width / 2 for i in indices] | |
return series | |
def forward(self, series, seasonality, h): | |
series_tokenized = self.tokenize_time_series(series) | |
prompt = f""" | |
{self.prompt}-consider {seasonality} as seasonality | |
- just print {h} steps ahead | |
this is the series: {series_tokenized} | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": prompt}] | |
) | |
output_gpt = response['choices'][0]['message']['content'] | |
output_gpt = self.extend_string(output_gpt, h) | |
output_gpt = ' '.join(f'{max(min(int(x), len(self.bins) - 1), 0)}' for x in output_gpt.split()) | |
return self.decode_time_series(output_gpt) | |
def compute_ds_future(self, ds, fh): | |
ds_ = pd.to_datetime(ds) | |
try: | |
freq = pd.infer_freq(ds_) | |
except: | |
freq = None | |
if freq is not None: | |
ds_future = pd.date_range(ds_[-1], periods=fh + 1, freq=freq)[1:] | |
else: | |
freq = ds_[-1] - ds_[-2] | |
ds_future = [ds_[-1] + (i + 1) * freq for i in range(fh)] | |
ds_future = list(map(str, ds_future)) | |
return ds_future, freq | |
def forecast(self, df, h, input_size): | |
df = df.copy() | |
scaler = MinMaxScaler() | |
df['y'] = scaler.fit_transform(df[['y']]) | |
ds_future, freq = self.compute_ds_future(df['ds'].values, h) | |
sf = StatsForecast(models=[Naive()], freq='D') | |
fcst_df = sf.forecast(df=df, h=h) | |
fcst_df['ds'] = ds_future | |
fcst_df['ChatGPT-3.5-Turbo'] = self.forward(df['y'].values[-input_size:], freq, h)[-h:] | |
for col in ['Naive', 'ChatGPT-3.5-Turbo']: | |
fcst_df[col] = scaler.inverse_transform(fcst_df[[col]]) | |
df['y'] = scaler.inverse_transform(df[['y']]) | |
return sf.plot(df, fcst_df, max_insample_length=3 * h) | |