Spaces:
Running
Running
from __future__ import annotations | |
import ast | |
import json | |
import os | |
import random | |
import logging | |
import requests | |
from dotenv import load_dotenv | |
from griptape.artifacts import ListArtifact, TextArtifact | |
from griptape.configs import Defaults | |
from griptape.configs.drivers import OpenAiDriversConfig | |
from griptape.drivers import ( | |
LocalStructureRunDriver, | |
OpenAiChatPromptDriver, | |
GriptapeCloudVectorStoreDriver, | |
) | |
from griptape.artifacts import ListArtifact, TextArtifact | |
from griptape.rules import Ruleset, Rule | |
import json | |
import requests | |
import random | |
import os | |
from dotenv import load_dotenv | |
from griptape.engines.rag import RagEngine | |
from griptape.engines.rag.modules import ( | |
VectorStoreRetrievalRagModule, | |
TextChunksResponseRagModule, | |
) | |
from griptape.engines.rag.stages import ResponseRagStage, RetrievalRagStage | |
from griptape.tools import RagTool | |
from griptape.configs.logging import TruncateLoggingFilter | |
from griptape_statemachine.parsers.uw_csv_parser import CsvParser | |
load_dotenv() | |
# openai default config pass in a new openai driver | |
Defaults.drivers_config = OpenAiDriversConfig( | |
prompt_driver=OpenAiChatPromptDriver(model="gpt-4o", max_tokens=4096) | |
) | |
# logger = logging.getLogger(Defaults.logging_config.logger_name) | |
# logger.setLevel(logging.ERROR) | |
# logger.addFilter(TruncateLoggingFilter(max_log_length=5000)) | |
# ALL METHODS RELATING TO THE WORKFLOW AND PIPELINE | |
def end_workflow(task: CodeExecutionTask) -> ListArtifact: | |
parent_outputs = task.parent_outputs | |
questions = [] | |
for output in parent_outputs.values(): | |
output = output.value | |
try: | |
output = ast.literal_eval(output) | |
question = {output["Question"]: output} | |
questions.append(TextArtifact(question)) | |
except SyntaxError: | |
pass | |
return ListArtifact(questions) | |
def get_questions_workflow() -> Workflow: | |
workflow = Workflow(id="create_question_workflow") | |
# How many questions still need to be created | |
for _ in range(10): | |
task = StructureRunTask( | |
driver=LocalStructureRunDriver(create_structure=get_single_question), | |
child_ids=["end_task"], | |
) | |
workflow.add_task(task) | |
end_task = CodeExecutionTask(id="end_task", on_run=end_workflow) | |
workflow.add_task(end_task) | |
return workflow | |
def single_question_last_task(task: CodeExecutionTask) -> TextArtifact: | |
parent_outputs = task.parent_outputs | |
print(f"PARENT OUTPUTS ARE: {parent_outputs}") | |
wrong_answers = parent_outputs["wrong_answers"].value # Output is a list | |
wrong_answers = wrong_answers.split("\n") | |
question_and_answer = parent_outputs["get_question"].value # Output is a json | |
question_and_answer = json.loads(question_and_answer) | |
inputs = task.input.value.split(",") | |
question = { | |
"Question": question_and_answer["Question"], | |
"Answer": question_and_answer["Answer"], | |
"Wrong Answers": wrong_answers, | |
"Page": int(inputs[0]), | |
"Taxonomy": inputs[1], | |
} | |
return TextArtifact(question) | |
def get_question_for_wrong_answers(task: CodeExecutionTask) -> TextArtifact: | |
parent_outputs = task.parent_outputs | |
question = parent_outputs["get_question"].value | |
print(question) | |
question = json.loads(question)["Question"] | |
return TextArtifact(question) | |
def get_single_question() -> Workflow: | |
question_generator = Workflow() | |
page_number = random.choice(list(range(1, 9))) | |
taxonomy = random.choice(["Knowledge", "Comprehension", "Application"]) | |
taxonomyprompt = { | |
"Knowledge": "Generate a quiz question based ONLY on this information: {{parent_outputs['information_task']}}, then write the answer to the question. The interrogative verb for the question should be one of 'define', 'list', 'state', 'identify', or 'label'.", | |
"Comprehension": "Generate a quiz question based ONLY on this information: {{parent_outputs['information_task']}}, then write the answer to the question. The interrogative verb for the question should be one of 'explain', 'predict', 'interpret', 'infer', 'summarize', 'convert', or 'give an example of x'.", | |
"Application": "Generate a quiz question based ONLY on this information: {{parent_outputs['information_task']}}, then write the answer to the question. The structure of the question should be one of 'How could x be used to y?' or 'How would you show/make use of/modify/demonstrate/solve/apply x to conditions y?'", | |
} | |
# Get KBs and select it, assign it to the structure or create the structure right here. | |
# Rules for subject matter expert: return only a json with question and answer as keys. | |
generate_q_task = StructureRunTask( | |
id="get_question", | |
input=taxonomyprompt[taxonomy], | |
driver=LocalStructureRunDriver( | |
create_structure=lambda: get_structure("subject_matter_expert", page_number) | |
), | |
) | |
get_question_code_task = CodeExecutionTask( | |
id="get_only_question", | |
on_run=get_question_for_wrong_answers, | |
parent_ids=["get_question"], | |
child_ids=["wrong_answers"], | |
) | |
# This will use the same KB as the previous task | |
generate_wrong_answers = StructureRunTask( | |
id="wrong_answers", | |
input="""Write and return three incorrect answers for this question: {{parent_outputs['get_only_question']}} with this context: {{parent_outputs['information_task']}}""", | |
structure_run_driver=LocalStructureRunDriver( | |
create_structure=lambda: get_structure("wrong_answers_generator") | |
), | |
parent_ids=["get_only_question"], | |
) | |
compile_task = CodeExecutionTask( | |
id="compile_task", | |
input=f"{page_number}, {taxonomy})", | |
on_run=single_question_last_task, | |
parent_ids=["wrong_answers", "get_question"], | |
) | |
question_generator.add_tasks( | |
generate_q_task, | |
get_question_code_task, | |
generate_wrong_answers, | |
compile_task, | |
) | |
return question_generator | |
def get_structure(structure_id: str, page_number=0) -> Structure: | |
match structure_id: | |
case "subject_matter_expert": | |
rulesets = Ruleset( | |
name="specific_question_creator", | |
rules=[ | |
Rule( | |
"Return ONLY a json with 'Question' and 'Answer' as keys. No markdown, no commentary, no code, no backticks." | |
), | |
Rule( | |
"Query to knowledge base should always be 'find information for quiz question'" | |
), | |
Rule("Use ONLY information from your knowledge base"), | |
Rule( | |
"Question should be a question based on the knowledge base. Answer should be from knowledge base." | |
), | |
Rule( | |
"The answer to the question should be short, but should not omit important information." | |
), | |
Rule("Answer length should be 10 words maximum, 5 words minimum"), | |
], | |
) | |
structure = Agent( | |
id="subject_matter_expert", | |
prompt_driver=OpenAiChatPromptDriver(model="gpt-4o"), | |
rulesets=[rulesets], | |
tools=[tool], | |
) | |
case "taxonomy_expert": | |
rulesets = Ruleset( | |
name="KB Rules", | |
rules=[ | |
Rule( | |
"Use only your knowledge base. Do not make up any additional information." | |
), | |
Rule("Maximum 10 words."), | |
Rule( | |
"Return information an AI chatbot could use to write a question on a subject." | |
), | |
], | |
) | |
kb_driver = get_taxonomy_vs() | |
tool = build_rag_tool(build_rag_engine(kb_driver)) | |
structure = Agent( | |
id="taxonomy_expert", | |
prompt_driver=OpenAiChatPromptDriver(model="gpt-4o"), | |
tools=[tool], | |
) | |
case "wrong_answers_generator": | |
rulesets = Ruleset( | |
name="incorrect_answers_creator", | |
rules=[ | |
Rule( | |
"Return ONLY a list of 3 incorrect answers. No markdown, no commentary, no backticks." | |
), | |
Rule( | |
"All incorrect answers should be different, but plausible answers to the question." | |
), | |
Rule( | |
"Incorrect answers may reference material from the knowledge base, but must not be correct answers to the question" | |
), | |
Rule( | |
"Length of incorrect answers should be 10 words max, 5 words minimum" | |
), | |
], | |
) | |
kb_driver = get_vector_store_id_from_page(page_number) | |
tool = build_rag_tool(build_rag_engine(kb_driver)) | |
structure = Agent( | |
id="wrong_answers_generator", | |
prompt_driver=OpenAiChatPromptDriver(model="gpt-4o"), | |
rulesets=[rulesets], | |
tools=[tool], | |
) | |
case _: | |
structure = Agent(prompt_driver=OpenAiChatPromptDriver(model="gpt-4o")) | |
return structure | |
def get_vector_store_id_from_page(page: int) -> GriptapeCloudVectorStoreDriver | None: | |
base_url = "https://cloud.griptape.ai/api/" | |
kb_url = f"{base_url}/knowledge-bases" | |
headers = {"Authorization": f"Bearer {os.getenv('GT_CLOUD_API_KEY')}"} | |
# TODO: This needs to change when I have my own bucket. Right now, I'm doing the 10 most recently made KBs | |
response = requests.get(url=kb_url, headers=headers) | |
response = requests.get( | |
url=kb_url, | |
headers=headers, | |
) | |
response.raise_for_status() | |
if response.status_code == 200: | |
data = response.json() | |
for kb in data["knowledge_bases"]: | |
name = kb["name"] | |
if "KB_section" not in name: | |
continue | |
page_nums = name.split("pg")[1].split("-") | |
start_page = int(page_nums[0]) | |
end_page = int(page_nums[1]) | |
if end_page <= 40 and start_page >= 1: | |
possible_kbs[kb["knowledge_base_id"]] = f"{start_page}-{end_page}" | |
kb_id = random.choice(list(possible_kbs.keys())) | |
page_value = possible_kbs[kb_id] | |
return page_value, GriptapeCloudVectorStoreDriver( | |
api_key=os.getenv("GT_CLOUD_API_KEY", ""), | |
knowledge_base_id=kb_id, | |
) | |
else: | |
raise ValueError(response.status_code) | |
return None | |
def get_taxonomy_vs() -> GriptapeCloudVectorStoreDriver: | |
return GriptapeCloudVectorStoreDriver( | |
api_key=os.getenv("GT_CLOUD_API_KEY", ""), | |
knowledge_base_id="2c3a6f19-51a8-43c3-8445-c7fbe06bf460", | |
) | |
def build_rag_engine(vector_store_driver) -> RagEngine: | |
return RagEngine( | |
retrieval_stage=RetrievalRagStage( | |
retrieval_modules=[ | |
VectorStoreRetrievalRagModule( | |
vector_store_driver=vector_store_driver, | |
query_params={ | |
"count": 100, | |
}, | |
) | |
], | |
), | |
response_stage=ResponseRagStage( | |
response_modules=[TextChunksResponseRagModule()] | |
), | |
) | |
def build_rag_tool(engine) -> RagTool: | |
return RagTool( | |
description="Contains information about the textbook. Use it to answer any related questions.", | |
rag_engine=engine, | |
) | |
if __name__ == "__main__": | |
# workflow = get_questions_workflow() | |
# workflow.run() | |
CsvParser("uw_programmatic").csv_parser() | |