Spaces:
Sleeping
Sleeping
File size: 8,911 Bytes
d477d5c d699829 d477d5c f685ddc d477d5c f685ddc d477d5c c56aab6 63e3e29 e7445ae f685ddc d477d5c c56aab6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
from __future__ import annotations
import ast
import csv
import json
from pathlib import Path
import random
from typing import TYPE_CHECKING
from uw_programmatic.base_machine import UWBaseMachine
if TYPE_CHECKING:
from griptape.tools import BaseTool
class UWMachine(UWBaseMachine):
"""State machine with GOAP"""
@property
def tools(self) -> dict[str, BaseTool]:
return {}
def start_machine(self) -> None:
"""Starts the machine."""
# Clear input history.
# Clear csv file
self.send("enter_first_state")
def on_event_gather_parameters(self, event_: dict) -> None:
event_source = event_["type"]
event_value = event_["value"]
match event_source:
case "user_input":
parameters = event_value
self.page_range = parameters["page_range"]
self.question_number = parameters["question_number"]
self.taxonomy = parameters["taxonomy"]
self.current_question_count = 0
self.give_up_count = 0
self.send("next_state")
case _:
err_msg = f"Unexpected Transition Event ID: {event_value}."
raise ValueError(err_msg)
def on_enter_evaluate_q_count(self) -> None:
# Check if the number of questions has incremented
if len(self.question_list) <= self.current_question_count:
self.give_up_count += 1
else:
self.current_question_count = len(self.question_list)
self.give_up_count = 0
if self.give_up_count == 3:
self.send("finish_state")
return
if len(self.question_list) >= self.question_number:
self.send("finish_state") # go to output questions
else:
self.send("next_state") # go to need more questions
def on_event_evaluate_q_count(self, event_: dict) -> None:
pass
def on_enter_need_more_q(self) -> None:
# Create the entire workflow to create another question.
self.get_questions_workflow().run()
def on_event_need_more_q(self, event_: dict) -> None:
event_source = event_["type"]
event_value = event_["value"]
match event_source:
case "griptape_event":
event_type = event_value["type"]
match event_type:
case "FinishStructureRunEvent":
structure_id = event_value["structure_id"]
match structure_id:
case "create_question_workflow":
values = event_value["output_task_output"]["value"]
questions = [
ast.literal_eval(question["value"])
for question in values
]
self.most_recent_questions = (
questions # This is a ListArtifact I'm pretty sure
)
self.send("next_state")
case _:
print(f"Error:{event_} ")
case _:
print(f"Unexpected: {event_}")
def on_enter_assess_generated_q(self) -> None:
# TODO: Should it append it to the list already and remove duplicates? or not?
# TODO: Merge incoming lists
merged_list = [*self.question_list, *self.most_recent_questions]
prompt = f"{merged_list}"
self.get_structure("similarity_auditor").run(prompt)
def on_event_assess_generated_q(self, event_: dict) -> None:
event_source = event_["type"]
event_value = event_["value"]
match event_source:
case "griptape_event":
event_type = event_value["type"]
match event_type:
case "FinishStructureRunEvent":
structure_id = event_value["structure_id"]
match structure_id:
case "similarity_auditor":
new_question_list = event_value["output_task_output"][
"value"
]
try:
new_question_list = json.loads(
new_question_list
) # This must be in that JSON format
except:
new_question_list = self.question_list
self.question_list = new_question_list
self.send("next_state") # move on
def on_enter_output_q(self) -> None:
with Path(Path.cwd().joinpath("outputs/professor_guide.csv")).open(
"w", newline=""
) as file:
writer = csv.writer(file)
for question in range(len(self.question_list)):
new_row = ["MC", "", 1]
try:
new_row.append(self.question_list[question]["Question"])
wrong_answers = list(self.question_list[question]["Wrong Answers"])
column = random.randint(1, len(wrong_answers) + 1)
new_row.append(column)
for i in range(1, len(wrong_answers) + 2):
if i == column:
new_row.append(self.question_list[question]["Answer"])
else:
wrong_answer = wrong_answers.pop()
if not wrong_answer:
wrong_answer = ""
new_row.append(wrong_answer)
new_row.append(self.question_list[question]["Page"])
new_row.append(self.question_list[question]["Taxonomy"])
writer.writerow(new_row)
except KeyError:
new_row.append(self.question_list["Question"])
wrong_answers = list(self.question_list["Wrong Answers"])
column = random.randint(1, len(wrong_answers) + 1)
new_row.append(column)
for i in range(1, len(wrong_answers) + 2):
if i == column:
new_row.append(self.question_list["Answer"])
else:
new_row.append(wrong_answers.pop())
new_row.append(self.question_list["Page"])
new_row.append(self.question_list["Taxonomy"])
writer.writerow(new_row)
if self.give_up_count == 3:
writer.writerow("Failed to generate more questions.")
self.send("next_state")
def on_event_output_q(self, event_: dict) -> None:
pass
def on_exit_output_q(self) -> None:
# Reset the state machine values
self.question_list = []
self.most_recent_questions = []
if __name__ == "__main__":
question_list = [
{
"Page": "1-2",
"Taxonomy": "Knowledge",
"Question": "What is Python?",
"Answer": "A programming language",
"Wrong Answers": ["A snake", "A car brand", "A fruit"],
},
{
"Page": "3-4",
"Taxonomy": "Comprehension",
"Question": "What does HTML stand for?",
"Answer": "HyperText Markup Language",
"Wrong Answers": [
"High Text Machine Language",
"Hyperlink Text Mode Language",
"None of the above",
],
},
]
with Path(Path.cwd().joinpath("outputs/professor_guide.csv")).open(
"w", newline=""
) as file:
writer = csv.writer(file)
for question in range(len(question_list)):
# TODO: Shuffle answers according to row, keep correct answer in random section. Answer column is a number.
new_row = [question_list[question]["Question"]]
wrong_answers = list(question_list[question]["Wrong Answers"])
column = random.randint(1, len(wrong_answers) + 1)
new_row.append(column)
for i in range(1, len(wrong_answers) + 2):
if i == column:
new_row.append(question_list[question]["Answer"])
else:
new_row.append(wrong_answers.pop())
new_row.append(question_list[question]["Page"])
new_row.append(question_list[question]["Taxonomy"])
writer.writerow(new_row)
|