import ast import functools import json import os import sys import tempfile import time import typing import uuid import pytest from tests.utils import wrap_test_forked from src.prompter_utils import base64_encode_jinja_template, base64_decode_jinja_template from src.vision.utils_vision import process_file_list from src.utils import get_list_or_str, read_popen_pipes, get_token_count, reverse_ucurve_list, undo_reverse_ucurve_list, \ is_uuid4, has_starting_code_block, extract_code_block_content, looks_like_json, get_json, is_full_git_hash, \ deduplicate_names, handle_json, check_input_type, start_faulthandler, remove, get_gradio_depth, create_typed_dict, \ execute_cmd_stream from src.enums import invalid_json_str, user_prompt_for_fake_system_prompt0 from src.prompter import apply_chat_template import subprocess as sp start_faulthandler() @wrap_test_forked def test_get_list_or_str(): assert get_list_or_str(['foo', 'bar']) == ['foo', 'bar'] assert get_list_or_str('foo') == 'foo' assert get_list_or_str("['foo', 'bar']") == ['foo', 'bar'] @wrap_test_forked def test_stream_popen1(): cmd_python = sys.executable python_args = "-q -u" python_code = "print('hi')" cmd = f"{cmd_python} {python_args} -c \"{python_code}\"" with sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True, shell=True) as p: for out_line, err_line in read_popen_pipes(p): print(out_line, end='') print(err_line, end='') p.poll() @wrap_test_forked def test_stream_popen2(): script = """for i in 0 1 2 3 4 5 do echo "This messages goes to stdout $i" sleep 1 echo This message goes to stderr >&2 sleep 1 done """ with open('pieces.sh', 'wt') as f: f.write(script) os.chmod('pieces.sh', 0o755) with sp.Popen(["./pieces.sh"], stdout=sp.PIPE, stderr=sp.PIPE, text=True, shell=True) as p: for out_line, err_line in read_popen_pipes(p): print(out_line, end='') print(err_line, end='') p.poll() @wrap_test_forked def test_stream_python_execution(capsys): script = """ import sys import time for i in range(3): print(f"This message goes to stdout {i}") time.sleep(0.1) print(f"This message goes to stderr {i}", file=sys.stderr) time.sleep(0.1) """ result = execute_cmd_stream( script_content=script, cwd=None, env=None, timeout=5, capture_output=True, text=True, print_tags=True, print_literal=False, ) # Capture the printed output captured = capsys.readouterr() # Print the captured output for verification print("Captured output:") print(captured.out) # Check return code assert result.returncode == 0, f"Expected return code 0, but got {result.returncode}" # Check stdout content expected_stdout = "This message goes to stdout 0\nThis message goes to stdout 1\nThis message goes to stdout 2\n" assert expected_stdout in result.stdout, f"Expected stdout to contain:\n{expected_stdout}\nBut got:\n{result.stdout}" # Check stderr content expected_stderr = "This message goes to stderr 0\nThis message goes to stderr 1\nThis message goes to stderr 2\n" assert expected_stderr in result.stderr, f"Expected stderr to contain:\n{expected_stderr}\nBut got:\n{result.stderr}" # Check if the output was streamed (should appear in captured output) assert "STDOUT: This message goes to stdout 0" in captured.out, "Streaming output not detected in stdout" assert "STDERR: This message goes to stderr 0" in captured.out, "Streaming output not detected in stderr" print("All tests passed successfully!") def test_stream_python_execution_empty_lines(capsys): script = """ import sys import time print() print("Hello") print() print("World", file=sys.stderr) print() """ result = execute_cmd_stream( script_content=script, cwd=None, env=None, timeout=5, capture_output=True, text=True ) captured = capsys.readouterr() print("Captured output:") print(captured.out) # Check that we only see STDOUT and STDERR for non-empty lines assert captured.out.count("STDOUT:") == 1, "Expected only one STDOUT line" assert captured.out.count("STDERR:") == 1, "Expected only one STDERR line" assert "STDOUT: Hello" in captured.out, "Expected 'Hello' in stdout" assert "STDERR: World" in captured.out, "Expected 'World' in stderr" print("All tests passed successfully!") @wrap_test_forked def test_memory_limit(): result = execute_cmd_stream(cmd=['python', './tests/memory_hog_script.py'], max_memory_usage=500_000_000) assert result.returncode == -15 print(result.stdout, file=sys.stderr, flush=True) print(result.stderr, file=sys.stderr, flush=True) @pytest.mark.parametrize("text_context_list", ['text_context_list1', 'text_context_list2', 'text_context_list3', 'text_context_list4', 'text_context_list5', 'text_context_list6']) @pytest.mark.parametrize("system_prompt", ['auto', '']) @pytest.mark.parametrize("context", ['context1', 'context2']) @pytest.mark.parametrize("iinput", ['iinput1', 'iinput2']) @pytest.mark.parametrize("chat_conversation", ['chat_conversation1', 'chat_conversation2']) @pytest.mark.parametrize("instruction", ['instruction1', 'instruction2']) @wrap_test_forked def test_limited_prompt(instruction, chat_conversation, iinput, context, system_prompt, text_context_list): instruction1 = 'Who are you?' instruction2 = ' '.join(['foo_%s ' % x for x in range(0, 500)]) instruction = instruction1 if instruction == 'instruction1' else instruction2 iinput1 = 'Extra instruction info' iinput2 = ' '.join(['iinput_%s ' % x for x in range(0, 500)]) iinput = iinput1 if iinput == 'iinput1' else iinput2 context1 = 'context' context2 = ' '.join(['context_%s ' % x for x in range(0, 500)]) context = context1 if context == 'context1' else context2 chat_conversation1 = [] chat_conversation2 = [['user_conv_%s ' % x, 'bot_conv_%s ' % x] for x in range(0, 500)] chat_conversation = chat_conversation1 if chat_conversation == 'chat_conversation1' else chat_conversation2 text_context_list1 = [] text_context_list2 = ['doc_%s ' % x for x in range(0, 500)] text_context_list3 = ['doc_%s ' % x for x in range(0, 10)] text_context_list4 = ['documentmany_%s ' % x for x in range(0, 10000)] import random, string text_context_list5 = [ 'documentlong_%s_%s' % (x, ''.join(random.choices(string.ascii_letters + string.digits, k=300))) for x in range(0, 20)] text_context_list6 = [ 'documentlong_%s_%s' % (x, ''.join(random.choices(string.ascii_letters + string.digits, k=4000))) for x in range(0, 1)] if text_context_list == 'text_context_list1': text_context_list = text_context_list1 elif text_context_list == 'text_context_list2': text_context_list = text_context_list2 elif text_context_list == 'text_context_list3': text_context_list = text_context_list3 elif text_context_list == 'text_context_list4': text_context_list = text_context_list4 elif text_context_list == 'text_context_list5': text_context_list = text_context_list5 elif text_context_list == 'text_context_list6': text_context_list = text_context_list6 else: raise ValueError("No such %s" % text_context_list) from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained('h2oai/h2ogpt-4096-llama2-7b-chat') prompt_type = 'llama2' prompt_dict = None debug = False chat = True stream_output = True from src.prompter import Prompter prompter = Prompter(prompt_type, prompt_dict, debug=debug, stream_output=stream_output, system_prompt=system_prompt, tokenizer=tokenizer) min_max_new_tokens = 512 # like in get_limited_prompt() max_input_tokens = -1 max_new_tokens = 1024 model_max_length = 4096 from src.gen import get_limited_prompt estimated_full_prompt, \ instruction, iinput, context, \ num_prompt_tokens, max_new_tokens, \ num_prompt_tokens0, num_prompt_tokens_actual, \ history_to_use_final, external_handle_chat_conversation, \ top_k_docs_trial, one_doc_size, truncation_generation, system_prompt, _, _ = \ get_limited_prompt(instruction, iinput, tokenizer, prompter=prompter, max_new_tokens=max_new_tokens, context=context, chat_conversation=chat_conversation, text_context_list=text_context_list, model_max_length=model_max_length, min_max_new_tokens=min_max_new_tokens, max_input_tokens=max_input_tokens, verbose=True) print('%s -> %s or %s: len(history_to_use_final): %s top_k_docs_trial=%s one_doc_size: %s' % (num_prompt_tokens0, num_prompt_tokens, num_prompt_tokens_actual, len(history_to_use_final), top_k_docs_trial, one_doc_size), flush=True, file=sys.stderr) assert num_prompt_tokens <= model_max_length + min_max_new_tokens # actual might be less due to token merging for characters across parts, but not more assert num_prompt_tokens >= num_prompt_tokens_actual assert num_prompt_tokens_actual <= model_max_length if top_k_docs_trial > 0: text_context_list = text_context_list[:top_k_docs_trial] elif one_doc_size is not None: text_context_list = [text_context_list[0][:one_doc_size]] else: text_context_list = [] assert sum([get_token_count(x, tokenizer) for x in text_context_list]) <= model_max_length @wrap_test_forked def test_reverse_ucurve(): ab = [] a = [1, 2, 3, 4, 5, 6, 7, 8] b = [2, 4, 6, 8, 7, 5, 3, 1] ab.append([a, b]) a = [1] b = [1] ab.append([a, b]) a = [1, 2] b = [2, 1] ab.append([a, b]) a = [1, 2, 3] b = [2, 3, 1] ab.append([a, b]) a = [1, 2, 3, 4] b = [2, 4, 3, 1] ab.append([a, b]) for a, b in ab: assert reverse_ucurve_list(a) == b assert undo_reverse_ucurve_list(b) == a @wrap_test_forked def check_gradio(): import gradio as gr assert gr.__h2oai__ @wrap_test_forked def test_is_uuid4(): # Example usage: test_strings = [ "f47ac10b-58cc-4372-a567-0e02b2c3d479", # Valid UUID v4 "not-a-uuid", # Invalid "12345678-1234-1234-1234-123456789abc", # Valid UUID v4 "xyz" # Invalid ] # "f47ac10b-58cc-4372-a567-0e02b2c3d479": True (Valid UUID v4) # "not-a-uuid": False (Invalid) # "12345678-1234-1234-1234-123456789abc": False (Invalid, even though it resembles a UUID, it doesn't follow the version 4 UUID pattern) # "xyz": False (Invalid) # Check each string and print whether it's a valid UUID v4 assert [is_uuid4(s) for s in test_strings] == [True, False, False, False] @wrap_test_forked def test_is_git_hash(): # Example usage: hashes = ["1a3b5c7d9e1a3b5c7d9e1a3b5c7d9e1a3b5c7d9e", "1G3b5c7d9e1a3b5c7d9e1a3b5c7d9e1a3b5c7d9e", "1a3b5c7d"] assert [is_full_git_hash(h) for h in hashes] == [True, False, False] @wrap_test_forked def test_chat_template(): instruction = "Who are you?" system_prompt = "Be kind" history_to_use = [('Are you awesome?', "Yes I'm awesome.")] image_file = [] other_base_models = ['h2oai/mixtral-gm-rag-experimental-v2'] supports_system_prompt = ['meta-llama/Llama-2-7b-chat-hf', 'openchat/openchat-3.5-1210', 'SeaLLMs/SeaLLM-7B-v2', 'h2oai/h2ogpt-gm-experimental'] base_models = supports_system_prompt + other_base_models for base_model in base_models: from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(base_model) prompt = apply_chat_template(instruction, system_prompt, history_to_use, image_file, tokenizer, user_prompt_for_fake_system_prompt=user_prompt_for_fake_system_prompt0, verbose=True) assert 'Be kind' in prompt # put into pre-conversation if no actual system prompt assert instruction in prompt assert history_to_use[0][0] in prompt assert history_to_use[0][1] in prompt @wrap_test_forked def test_chat_template_images(): history_to_use = [('Are you awesome?', "Yes I'm awesome.")] base_model = 'OpenGVLab/InternVL-Chat-V1-5' from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) messages = [{'role': 'system', 'content': 'You are h2oGPTe, an expert question-answering AI system created by H2O.ai that performs like GPT-4 by OpenAI.'}, {'role': 'user', 'content': 'What is the name of the tower in one of the images?'}] prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) assert prompt is not None (instruction, system_prompt, chat_conversation, image_file, user_prompt_for_fake_system_prompt, test_only, verbose) = ('What is the name of the tower in one of the images?', 'You are h2oGPTe, an expert question-answering AI system created by H2O.ai that performs like GPT-4 by OpenAI.', [], ['/tmp/image_file_0f5f011d-c907-4836-9f38-0ba579b45ffc.jpeg', '/tmp/image_file_60dce245-af39-4f8c-9651-df9ae0bd0afa.jpeg', '/tmp/image_file_e0b32625-9de3-40d7-98fb-c2e6368d6d73.jpeg'], None, False, False) prompt = apply_chat_template(instruction, system_prompt, history_to_use, image_file, tokenizer, user_prompt_for_fake_system_prompt=user_prompt_for_fake_system_prompt0, test_only=test_only, verbose=verbose) assert 'h2oGPTe' in prompt # put into pre-conversation if no actual system prompt assert instruction in prompt assert history_to_use[0][0] in prompt assert history_to_use[0][1] in prompt @wrap_test_forked def test_partial_codeblock(): json.dumps(invalid_json_str) # Example usages: example_1 = "```code block starts immediately" example_2 = "\n ```code block after newline and spaces" example_3 = "
```code block after HTML line break" example_4 = "This is a regular text without a code block." assert has_starting_code_block(example_1) assert has_starting_code_block(example_2) assert has_starting_code_block(example_3) assert not has_starting_code_block(example_4) # Example usages: example_stream_1 = "```code block content here```more text" example_stream_2 = "```code block content with no end yet..." example_stream_3 = "```\ncode block content here\n```\nmore text" example_stream_4 = "```\ncode block content \nwith no end yet..." example_stream_5 = "\n ```\ncode block content here\n```\nmore text" example_stream_6 = "\n ```\ncode block content \nwith no end yet..." example_stream_7 = "more text" assert extract_code_block_content(example_stream_1) == "block content here" assert extract_code_block_content(example_stream_2) == "block content with no end yet..." assert extract_code_block_content(example_stream_3) == "code block content here" assert extract_code_block_content(example_stream_4) == "code block content \nwith no end yet..." assert extract_code_block_content(example_stream_5) == "code block content here" assert extract_code_block_content(example_stream_6) == "code block content \nwith no end yet..." assert extract_code_block_content(example_stream_7) == "" # Assuming the function extract_code_block_content is defined as previously described. # Test case 1: Empty string assert extract_code_block_content("") is '', "Test 1 Failed: Should return None for empty string" # Test case 2: No starting code block assert extract_code_block_content( "No code block here") is '', "Test 2 Failed: Should return None if there's no starting code block" # Test case 3: Code block at the start without ending assert extract_code_block_content( "```text\nStarting without end") == "Starting without end", "Test 3 Failed: Should return the content of code block starting at the beginning" # Test case 4: Code block at the end without starting assert extract_code_block_content( "Text before code block```text\nEnding without start") == "Ending without start", "Test 4 Failed: Should extract text following starting delimiter regardless of position" # Test case 5: Code block in the middle with proper closing assert extract_code_block_content( "Text before ```text\ncode block``` text after") == "code block", "Test 5 Failed: Should extract the code block in the middle" # Test case 6: Multiple code blocks, only extracts the first one assert extract_code_block_content( "```text\nFirst code block``` Text in between ```Second code block```") == "First code block", "Test 6 Failed: Should only extract the first code block" # Test case 7: Code block with only whitespace inside assert extract_code_block_content( "``` ```") == "", "Test 7 Failed: Should return an empty string for a code block with only whitespace" # Test case 8: Newline characters inside code block assert extract_code_block_content( "```\nLine 1\nLine 2\n```") == "Line 1\nLine 2", "Test 8 Failed: Should preserve newline characters within code block but not leading/trailing newlines due to .strip()" # Test case 9: Code block with special characters special_characters = "```text\nSpecial characters !@#$%^&*()```" assert extract_code_block_content( special_characters) == "Special characters !@#$%^&*()", "Test 9 Failed: Should correctly handle special characters" # Test case 10: No starting code block but with ending delimiter assert extract_code_block_content( "Text with ending code block delimiter```") is '', "Test 10 Failed: Should return None if there's no starting code block but with an ending delimiter" # Test cases assert looks_like_json('{ "key": "value" }'), "Failed: JSON object" assert looks_like_json('[1, 2, 3]'), "Failed: JSON array" assert looks_like_json(' "string" '), "Failed: JSON string" assert looks_like_json('null'), "Failed: JSON null" assert looks_like_json(' true '), "Failed: JSON true" assert looks_like_json('123'), "Failed: JSON number" assert not looks_like_json('Just a plain text'), "Failed: Not JSON" assert not looks_like_json('```code block```'), "Failed: Code block" # Test cases get_json_nofixup = functools.partial(get_json, fixup=False) assert get_json_nofixup( '{"key": "value"}') == '{"key": "value"}', "Failed: Valid JSON object should be returned as is." assert get_json_nofixup('[1, 2, 3]') == '[1, 2, 3]', "Failed: Valid JSON array should be returned as is." assert get_json_nofixup('```text\nSome code```') == 'Some code', "Failed: Code block content should be returned." assert get_json_nofixup( 'Some random text') == invalid_json_str, "Failed: Random text should lead to 'invalid json' return." assert get_json_nofixup( '```{"key": "value in code block"}```') == '{"key": "value in code block"}', "Failed: JSON in code block should be correctly extracted and returned." assert get_json_nofixup( '```code\nmore code```') == 'more code', "Failed: Multi-line code block content should be returned." assert get_json_nofixup( '```\n{"key": "value"}\n```') == '{"key": "value"}', "Failed: JSON object in code block with new lines should be correctly extracted and returned." assert get_json_nofixup('') == invalid_json_str, "Failed: Empty string should lead to 'invalid json' return." assert get_json_nofixup( 'True') == invalid_json_str, "Failed: Non-JSON 'True' value should lead to 'invalid json' return." assert get_json_nofixup( '{"incomplete": true,') == '{"incomplete": true,', "Failed: Incomplete JSON should still be considered as JSON and returned as is." answer = """Here is an example JSON that fits the provided schema: ```json { "name": "John Doe", "age": 30, "skills": ["Java", "Python", "JavaScript"], "work history": [ { "company": "ABC Corp", "duration": "2018-2020", "position": "Software Engineer" }, { "company": "XYZ Inc", "position": "Senior Software Engineer", "duration": "2020-Present" } ] } ``` Note that the `work history` array contains two objects, each with a `company`, `duration`, and `position` property. The `skills` array contains three string elements, each with a maximum length of 10 characters. The `name` and `age` properties are also present and are of the correct data types.""" assert get_json_nofixup(answer) == """{ "name": "John Doe", "age": 30, "skills": ["Java", "Python", "JavaScript"], "work history": [ { "company": "ABC Corp", "duration": "2018-2020", "position": "Software Engineer" }, { "company": "XYZ Inc", "position": "Senior Software Engineer", "duration": "2020-Present" } ] }""" # JSON within a code block json_in_code_block = """ Here is an example JSON: ```json {"key": "value"} ``` """ # Plain JSON response plain_json_response = '{"key": "value"}' # Invalid JSON or non-JSON response non_json_response = "This is just some text." # Tests assert get_json_nofixup( json_in_code_block).strip() == '{"key": "value"}', "Should extract and return JSON from a code block." assert get_json_nofixup(plain_json_response) == '{"key": "value"}', "Should return plain JSON as is." assert get_json_nofixup( non_json_response) == invalid_json_str, "Should return 'invalid json' for non-JSON response." # Test with the provided example stream_content = """ {\n \"name\": \"John Doe\",\n \"email\": \"john.doe@example.com\",\n \"jobTitle\": \"Software Developer\",\n \"department\": \"Technology\",\n \"hireDate\": \"2020-01-01\",\n \"employeeId\": 123456,\n \"manager\": {\n \"name\": \"Jane Smith\",\n \"email\": \"jane.smith@example.com\",\n \"jobTitle\": \"Senior Software Developer\"\n },\n \"skills\": [\n \"Java\",\n \"Python\",\n \"JavaScript\",\n \"React\",\n \"Spring\"\n ],\n \"education\": {\n \"degree\": \"Bachelor's Degree\",\n \"field\": \"Computer Science\",\n \"institution\": \"Example University\",\n \"graduationYear\": 2018\n },\n \"awards\": [\n {\n \"awardName\": \"Best Developer of the Year\",\n \"year\": 2021\n },\n {\n \"awardName\": \"Most Valuable Team Player\",\n \"year\": 2020\n }\n ],\n \"performanceRatings\": {\n \"communication\": 4.5,\n \"teamwork\": 4.8,\n \"creativity\": 4.2,\n \"problem-solving\": 4.6,\n \"technical skills\": 4.7\n }\n}\n```""" extracted_content = get_json_nofixup(stream_content) assert extracted_content == """{\n \"name\": \"John Doe\",\n \"email\": \"john.doe@example.com\",\n \"jobTitle\": \"Software Developer\",\n \"department\": \"Technology\",\n \"hireDate\": \"2020-01-01\",\n \"employeeId\": 123456,\n \"manager\": {\n \"name\": \"Jane Smith\",\n \"email\": \"jane.smith@example.com\",\n \"jobTitle\": \"Senior Software Developer\"\n },\n \"skills\": [\n \"Java\",\n \"Python\",\n \"JavaScript\",\n \"React\",\n \"Spring\"\n ],\n \"education\": {\n \"degree\": \"Bachelor's Degree\",\n \"field\": \"Computer Science\",\n \"institution\": \"Example University\",\n \"graduationYear\": 2018\n },\n \"awards\": [\n {\n \"awardName\": \"Best Developer of the Year\",\n \"year\": 2021\n },\n {\n \"awardName\": \"Most Valuable Team Player\",\n \"year\": 2020\n }\n ],\n \"performanceRatings\": {\n \"communication\": 4.5,\n \"teamwork\": 4.8,\n \"creativity\": 4.2,\n \"problem-solving\": 4.6,\n \"technical skills\": 4.7\n }\n}""" def test_partial_codeblock2(): example_1 = "```code block starts immediately" example_2 = "\n ```code block after newline and spaces" example_3 = "
```code block after HTML line break" example_4 = "This is a regular text without a code block." assert has_starting_code_block(example_1) assert has_starting_code_block(example_2) assert has_starting_code_block(example_3) assert not has_starting_code_block(example_4) def test_extract_code_block_content(): example_stream_1 = "```code block content here```more text" example_stream_2 = "```code block content with no end yet..." example_stream_3 = "```\ncode block content here\n```\nmore text" example_stream_4 = "```\ncode block content \nwith no end yet..." example_stream_5 = "\n ```\ncode block content here\n```\nmore text" example_stream_6 = "\n ```\ncode block content \nwith no end yet..." example_stream_7 = "more text" example_stream_8 = """```markdown ```json { "Employee": { "Name": "Henry", "Title": "AI Scientist", "Department": "AI", "Location": "San Francisco", "Contact": { "Email": "henryai@gmail.com", "Phone": "+1-234-567-8901" }, "Profile": { "Education": [ { "Institution": "Stanford University", "Degree": "Ph.D.", "Field": "Computer Science" }, { "Institution": "University of California, Berkeley", "Degree": "M.S.", "Field": "Artificial Intelligence" } ], "Experience": [ { "Company": "Google", "Role": "Senior AI Engineer", "Duration": "5 years" }, { "Company": "Facebook", "Role": "Principal AI Engineer", "Duration": "3 years" } ], "Skills": [ "Python", "TensorFlow", "PyTorch", "Natural Language Processing", "Machine Learning" ], "Languages": [ "English", "French", "Spanish" ], "Certifications": [ { "Name": "Certified AI Professional", "Issuing Body": "AI Professional Association" }, { "Name": "Advanced AI Course Certificate", "Issuing Body": "AI Institute" } ] } } } ``` """ assert extract_code_block_content(example_stream_1) == "block content here" assert extract_code_block_content(example_stream_2) == "block content with no end yet..." assert extract_code_block_content(example_stream_3) == "code block content here" assert extract_code_block_content(example_stream_4) == "code block content \nwith no end yet..." assert extract_code_block_content(example_stream_5) == "code block content here" assert extract_code_block_content(example_stream_6) == "code block content \nwith no end yet..." assert extract_code_block_content(example_stream_7) == "" expected8 = """{ "Employee": { "Name": "Henry", "Title": "AI Scientist", "Department": "AI", "Location": "San Francisco", "Contact": { "Email": "henryai@gmail.com", "Phone": "+1-234-567-8901" }, "Profile": { "Education": [ { "Institution": "Stanford University", "Degree": "Ph.D.", "Field": "Computer Science" }, { "Institution": "University of California, Berkeley", "Degree": "M.S.", "Field": "Artificial Intelligence" } ], "Experience": [ { "Company": "Google", "Role": "Senior AI Engineer", "Duration": "5 years" }, { "Company": "Facebook", "Role": "Principal AI Engineer", "Duration": "3 years" } ], "Skills": [ "Python", "TensorFlow", "PyTorch", "Natural Language Processing", "Machine Learning" ], "Languages": [ "English", "French", "Spanish" ], "Certifications": [ { "Name": "Certified AI Professional", "Issuing Body": "AI Professional Association" }, { "Name": "Advanced AI Course Certificate", "Issuing Body": "AI Institute" } ] } } }""" assert extract_code_block_content(example_stream_8) == expected8 @pytest.mark.parametrize("method", ['repair_json', 'get_json']) @wrap_test_forked def test_repair_json(method): a = """{ "Supplementary Leverage Ratio": [7.0, 5.8, 5.7], "Liquidity Metrics": { "End of Period Liabilities and Equity": [2260, 2362, 2291], "Liquidity Coverage Ratio": [118, 115, 115], "Trading-Related Liabilities(7)": [84, 72, 72], "Total Available Liquidty Resources": [972, 994, 961], "Deposits Balance Sheet": [140, 166, 164], "Other Liabilities(7)": {}, "LTD": {}, "Equity": { "Book Value per share": [86.43, 92.16, 92.21], "Tangible Book Value per share": [73.67, 79.07, 79.16] } }, "Capital and Balance Sheet ($ in B)": { "Risk-based Capital Metrics(1)": { "End of Period Assets": [2260, 2362, 2291], "CET1 Capital": [147, 150, 150], "Standardized RWAs": [1222, 1284, 1224], "Investments, net": {}, "CET1 Capital Ratio - Standardized": [12.1, 11.7, 12.2], "Advanced RWAs": [1255, 1265, 1212], "Trading-Related Assets(5)": [670, 681, 659], "CET1 Capital Ratio - Advanced": [11.7, 11.8, 12.4], "Loans, net(6)": {}, "Other(5)": [182, 210, 206] } } } Note: Totals may not sum due to rounding. LTD: Long-term debt. All information for 4Q21 is preliminary. All footnotes are presented on Slide 26.""" from json_repair import repair_json for i in range(len(a)): text = a[:i] t0 = time.time() if method == 'repair_json': good_json_string = repair_json(text) else: good_json_string = get_json(text) if i > 50: assert len(good_json_string) > 5 tdelta = time.time() - t0 assert tdelta < 0.005, "Too slow: %s" % tdelta print("%s : %s : %s" % (i, tdelta, good_json_string)) json.loads(good_json_string) def test_json_repair_more(): response0 = """```markdown ```json { "Employee": { "Name": "Henry", "Title": "AI Scientist", "Department": "AI", "Location": "San Francisco", "Contact": { "Email": "henryai@gmail.com", "Phone": "+1-234-567-8901" }, "Profile": { "Education": [ { "Institution": "Stanford University", "Degree": "Ph.D.", "Field": "Computer Science" }, { "Institution": "University of California, Berkeley", "Degree": "M.S.", "Field": "Artificial Intelligence" } ], "Experience": [ { "Company": "Google", "Role": "Senior AI Engineer", "Duration": "5 years" }, { "Company": "Facebook", "Role": "Principal AI Engineer", "Duration": "3 years" } ], "Skills": [ "Python", "TensorFlow", "PyTorch", "Natural Language Processing", "Machine Learning" ], "Languages": [ "English", "French", "Spanish" ], "Certifications": [ { "Name": "Certified AI Professional", "Issuing Body": "AI Professional Association" }, { "Name": "Advanced AI Course Certificate", "Issuing Body": "AI Institute" } ] } } } ``` """ from json_repair import repair_json response = repair_json(response0) assert response.startswith('{') response0 = """ Here is an example employee profile in JSON format, with keys that are less than 64 characters and made of only alphanumerics, underscores, or hyphens: ```json { "employee_id": 1234, "name": "John Doe", "email": "johndoe@example.com", "job_title": "Software Engineer", "department": "Engineering", "hire_date": "2020-01-01", "salary": 100000, "manager_id": 5678 } ``` In Markdown, you can display this JSON code block like this: ```json ``` { "employee_id": 1234, "name": "John Doe", "email": "johndoe@example.com", "job_title": "Software Engineer", "department": "Engineering", "hire_date": "2020-01-01", "salary": 100000, "manager_id": 5678 } ``` This will display the JSON code block with proper formatting and highlighting. """ # from json_repair import repair_json from src.utils import get_json, repair_json_by_type import json response = repair_json_by_type(response0) assert json.loads(response)['employee_id'] == 1234 print(response) response = get_json(response0, json_schema_type='object') assert json.loads(response)['employee_id'] == 1234 print(response) @wrap_test_forked def test_dedup(): # Example usage: names_list = ['Alice', 'Bob', 'Alice', 'Charlie', 'Bob', 'Alice'] assert deduplicate_names(names_list) == ['Alice', 'Bob', 'Alice_1', 'Charlie', 'Bob_1', 'Alice_2'] # Test cases def test_handle_json_normal(): normal_json = { "name": "Henry", "age": 35, "skills": ["AI", "Machine Learning", "Data Science"], "workhistory": [ {"company": "TechCorp", "duration": "2015-2020", "position": "Senior AI Scientist"}, {"company": "AI Solutions", "duration": "2010-2015", "position": "AI Scientist"} ] } assert handle_json(normal_json) == normal_json def test_handle_json_schema(): schema_json = { "name": {"type": "string", "value": "Henry"}, "age": {"type": "integer", "value": 35}, "skills": {"type": "array", "items": [ {"type": "string", "value": "AI", "maxLength": 10}, {"type": "string", "value": "Machine Learning", "maxLength": 10}, {"type": "string", "value": "Data Science", "maxLength": 10} ], "minItems": 3}, "workhistory": {"type": "array", "items": [ {"type": "object", "properties": { "company": {"type": "string", "value": "TechCorp"}, "duration": {"type": "string", "value": "2015-2020"}, "position": {"type": "string", "value": "Senior AI Scientist"} }, "required": ["company", "position"]}, {"type": "object", "properties": { "company": {"type": "string", "value": "AI Solutions"}, "duration": {"type": "string", "value": "2010-2015"}, "position": {"type": "string", "value": "AI Scientist"} }, "required": ["company", "position"]} ]} } expected_result = { "name": "Henry", "age": 35, "skills": ["AI", "Machine Learning", "Data Science"], "workhistory": [ {"company": "TechCorp", "duration": "2015-2020", "position": "Senior AI Scientist"}, {"company": "AI Solutions", "duration": "2010-2015", "position": "AI Scientist"} ] } assert handle_json(schema_json) == expected_result def test_handle_json_mixed(): mixed_json = { "name": "Henry", "age": {"type": "integer", "value": 35}, "skills": ["AI", {"type": "string", "value": "Machine Learning"}, "Data Science"], "workhistory": {"type": "array", "items": [ {"type": "object", "properties": { "company": {"type": "string", "value": "TechCorp"}, "duration": {"type": "string", "value": "2015-2020"}, "position": {"type": "string", "value": "Senior AI Scientist"} }, "required": ["company", "position"]}, {"company": "AI Solutions", "duration": "2010-2015", "position": "AI Scientist"} ]} } expected_result = { "name": "Henry", "age": 35, "skills": ["AI", "Machine Learning", "Data Science"], "workhistory": [ {"company": "TechCorp", "duration": "2015-2020", "position": "Senior AI Scientist"}, {"company": "AI Solutions", "duration": "2010-2015", "position": "AI Scientist"} ] } assert handle_json(mixed_json) == expected_result def test_handle_json_empty(): empty_json = {} assert handle_json(empty_json) == empty_json def test_handle_json_no_schema(): no_schema_json = { "name": {"first": "Henry", "last": "Smith"}, "age": 35, "skills": ["AI", "Machine Learning", "Data Science"] } assert handle_json(no_schema_json) == no_schema_json def test_json_repair_on_string(): from json_repair import repair_json response0 = 'According to the information provided, the best safety assessment enum label is "Safe".' json_schema_type = 'object' response = get_json(response0, json_schema_type=json_schema_type) response = json.loads(response) assert isinstance(response, dict) and not response response = repair_json(response0) assert isinstance(response, str) and response in ['""', """''""", '', None] # Example usage converted to pytest test cases def test_check_input_type(): # Valid URL assert check_input_type("https://example.com") == 'url' # Valid file path (Note: Adjust the path to match an actual file on your system for the test to pass) assert check_input_type("tests/receipt.jpg") == 'file' # Valid base64 encoded image assert check_input_type("b'...") == 'base64' # Non-string inputs assert check_input_type(b"bytes data") == 'unknown' assert check_input_type(12345) == 'unknown' assert check_input_type(["list", "of", "strings"]) == 'unknown' # Invalid URL assert check_input_type("invalid://example.com") == 'unknown' # Invalid file path assert check_input_type("/path/to/invalid/file.txt") == 'unknown' # Plain string assert check_input_type("just a string") == 'unknown' def test_process_file_list(): # Create a list of test files test_files = [ "tests/videotest.mp4", "tests/dental.png", "tests/fastfood.jpg", "tests/ocr2.png", "tests/receipt.jpg", "tests/revenue.png", "tests/jon.png", "tests/ocr1.png", "tests/ocr3.png", "tests/screenshot.png", ] output_dir = os.path.join(tempfile.gettempdir(), 'image_path_%s' % str(uuid.uuid4())) print(output_dir, file=sys.stderr) # Process the files processed_files = process_file_list(test_files, output_dir, resolution=(640, 480), image_format="jpg", verbose=True) # Print the resulting list of image files print("Processed files:") for file in processed_files: print(file, file=sys.stderr) assert os.path.isfile(file) assert len(processed_files) == len( test_files) - 1 + 17 + 4 # 17 is the number of images generated from the video file def test_process_file_list_extract_frames(): # Create a list of test files test_files = [ "tests/videotest.mp4", "tests/dental.png", "tests/fastfood.jpg", "tests/ocr2.png", "tests/receipt.jpg", "tests/revenue.png", "tests/jon.png", "tests/ocr1.png", "tests/ocr3.png", "tests/screenshot.png", ] output_dir = os.path.join(tempfile.gettempdir(), 'image_path_%s' % str(uuid.uuid4())) print(output_dir, file=sys.stderr) # Process the files processed_files = process_file_list(test_files, output_dir, resolution=(640, 480), image_format="jpg", video_frame_period=0, extract_frames=10, verbose=True) # Print the resulting list of image files print("Processed files:") for file in processed_files: print(file, file=sys.stderr) assert os.path.isfile(file) assert len(processed_files) == len(test_files) - 1 + 10 # 10 is the number of images generated from the video file def test_process_youtube(): # Create a list of test files test_files = [ "https://www.youtube.com/shorts/fRkZCriQQNU", "tests/screenshot.png" ] output_dir = os.path.join(tempfile.gettempdir(), 'image_path_%s' % str(uuid.uuid4())) print(output_dir, file=sys.stderr) # Process the files processed_files = process_file_list(test_files, output_dir, resolution=(640, 480), image_format="jpg", video_frame_period=0, extract_frames=10, verbose=True) # Print the resulting list of image files print("Processed files:") for file in processed_files: print(file, file=sys.stderr) assert os.path.isfile(file) assert len(processed_files) == len(test_files) - 1 + 10 # 10 is the number of images generated from the video file def test_process_animated_gif(): # Create a list of test files test_files = [ "tests/test_animated_gif.gif", "tests/screenshot.png", ] output_dir = os.path.join(tempfile.gettempdir(), 'image_path_%s' % str(uuid.uuid4())) print(output_dir, file=sys.stderr) # Process the files processed_files = process_file_list(test_files, output_dir, resolution=(640, 480), image_format="jpg", video_frame_period=0, extract_frames=10, verbose=True) # Print the resulting list of image files print("Processed files:") for file in processed_files: print(file, file=sys.stderr) assert os.path.isfile(file) assert len(processed_files) == len(test_files) - 1 + 3 # 3 is the number of images generated from the animated gif def test_process_animated_gif2(): # Create a list of test files test_files = [ "tests/test_animated_gif.gif", "tests/screenshot.png" ] output_dir = os.path.join(tempfile.gettempdir(), 'image_path_%s' % str(uuid.uuid4())) print(output_dir, file=sys.stderr) # Process the files processed_files = process_file_list(test_files, output_dir, verbose=True) # Print the resulting list of image files print("Processed files:") for file in processed_files: print(file, file=sys.stderr) assert os.path.isfile(file) assert len(processed_files) == len(test_files) - 1 + 3 # 3 is the number of images generated from the animated gif def test_process_animated_gif3(): # Create a list of test files test_files = [ "tests/test_animated_gif.gif", "tests/screenshot.png" ] output_dir = os.path.join(tempfile.gettempdir(), 'image_path_%s' % str(uuid.uuid4())) print(output_dir, file=sys.stderr) # Process the files processed_files = process_file_list(test_files, output_dir, video_frame_period=1, verbose=True) # Print the resulting list of image files print("Processed files:") for file in processed_files: print(file, file=sys.stderr) assert os.path.isfile(file) assert len(processed_files) == len( test_files) - 1 + 60 # 60 is the number of images generated from the animated gif def test_process_mixed(): # Create a list of test files test_files = [ "tests/videotest.mp4", "https://www.youtube.com/shorts/fRkZCriQQNU", "tests/screenshot.png", "tests/test_animated_gif.gif", ] output_dir = os.path.join(tempfile.gettempdir(), 'image_path_%s' % str(uuid.uuid4())) print(output_dir, file=sys.stderr) # Process the files processed_files = process_file_list(test_files, output_dir, resolution=(640, 480), image_format="jpg", video_frame_period=0, extract_frames=10, verbose=True) # Print the resulting list of image files print("Processed files:") for file in processed_files: print(file, file=sys.stderr) assert os.path.isfile(file) assert len(processed_files) == len(test_files) - 1 + 29 # 28 is the number of images generated from the video files def test_update_db(): auth_filename = "test.db" remove(auth_filename) from src.db_utils import fetch_user assert fetch_user(auth_filename, '', verbose=True) == {} username = "jon" updates = { "selection_docs_state": { "langchain_modes": ["NewMode1"], "langchain_mode_paths": {"NewMode1": "new_mode_path1"}, "langchain_mode_types": {"NewMode1": "shared"} } } from src.db_utils import append_to_user_data append_to_user_data(auth_filename, username, updates, verbose=True) auth_dict = fetch_user(auth_filename, username, verbose=True) assert auth_dict == {'jon': {'selection_docs_state': {'langchain_mode_paths': {'NewMode1': 'new_mode_path1'}, 'langchain_mode_types': {'NewMode1': 'shared'}, 'langchain_modes': ['NewMode1']}}} updates = { "selection_docs_state": { "langchain_modes": ["NewMode"], "langchain_mode_paths": {"NewMode": "new_mode_path"}, "langchain_mode_types": {"NewMode": "shared"} } } from src.db_utils import append_to_users_data append_to_users_data(auth_filename, updates, verbose=True) auth_dict = fetch_user(auth_filename, username, verbose=True) assert auth_dict == {'jon': {'selection_docs_state': {'langchain_mode_paths': {'NewMode1': 'new_mode_path1', "NewMode": "new_mode_path"}, 'langchain_mode_types': {'NewMode1': 'shared', "NewMode": "shared"}, 'langchain_modes': ['NewMode1', 'NewMode']}}} def test_encode_chat_template(): jinja_template = """ {{ bos_token }} {%- if messages[0]['role'] == 'system' -%} {% set system_message = messages[0]['content'].strip() %} {% set loop_messages = messages[1:] %} {%- else -%} {% set system_message = 'This is a chat between a user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polite answers to the user\'s questions based on the context. The assistant should also indicate when the answer cannot be found in the context.' %} {% set loop_messages = messages %} {%- endif -%} System: {{ system_message }} {% for message in loop_messages %} {%- if message['role'] == 'user' -%} User: {{ message['content'].strip() + '\n' }} {%- else -%} Assistant: {{ message['content'].strip() + '\n' }} {%- endif %} {% if loop.last and message['role'] == 'user' %} Assistant: {% endif %} {% endfor %} """ encoded_template = base64_encode_jinja_template(jinja_template) print("\nEncoded Template:", encoded_template) model_lock_option = f"""--model_lock="[{{'inference_server': 'vllm_chat:149.130.210.116', 'base_model': 'nvidia/Llama3-ChatQA-1.5-70B', 'visible_models': 'nvidia/Llama3-ChatQA-1.5-70B', 'h2ogpt_key': '62224bfb-c832-4452-81e7-8a4bdabbe164', 'chat_template': '{encoded_template}'}}]" """ print("Command-Line Option:") print(model_lock_option) # Example of decoding back from the command-line option command_line_option = model_lock_option.strip('--model_lock=') # double ast.literal_eval due to quoted quote for model_lock_option parsed_model_lock_option = ast.literal_eval(ast.literal_eval(command_line_option)) encoded_template_from_option = parsed_model_lock_option[0]['chat_template'] decoded_template = base64_decode_jinja_template(encoded_template_from_option) print("Decoded Template:") print(decoded_template) assert jinja_template == decoded_template def test_depth(): example_list = [[['Dog', ['/tmp/gradio/image_Dog_d2b19221_6f70_4987_bda8_09be952eae93.png']], ['Who are you?', ['/tmp/gradio/image_Wh_480bd8318d01b570b61e77a9306aef87_c41f.png']], ['Who ar eyou?', "I apologize for the confusion earlier!\n\nI am LLaMA, an AI assistant developed by Meta AI that can understand and respond to human input in a conversational manner. I'm not a human, but a computer program designed to simulate conversation, answer questions, and even generate text based on the input I receive.\n\nI can assist with a wide range of topics, from general knowledge to entertainment, and even create stories or dialogues. I'm constantly learning and improving my responses based on the interactions I have with users like you.\n\nSo, feel free to ask me anything, and I'll do my best to help!"]], [], [], [], [], [], [], [], [], [], [], []] assert get_gradio_depth(example_list) == 3 example_list = [[[['Dog'], ['/tmp/gradio/image_Dog_d2b19221_6f70_4987_bda8_09be952eae93.png']], ['Who are you?', ['/tmp/gradio/image_Wh_480bd8318d01b570b61e77a9306aef87_c41f.png']], ['Who ar eyou?', "I apologize for the confusion earlier!\n\nI am LLaMA, an AI assistant developed by Meta AI that can understand and respond to human input in a conversational manner. I'm not a human, but a computer program designed to simulate conversation, answer questions, and even generate text based on the input I receive.\n\nI can assist with a wide range of topics, from general knowledge to entertainment, and even create stories or dialogues. I'm constantly learning and improving my responses based on the interactions I have with users like you.\n\nSo, feel free to ask me anything, and I'll do my best to help!"]], [], [], [], [], [], [], [], [], [], [], []] assert get_gradio_depth(example_list) == 3 example_list = [[['Dog', "Bad Dog"], ['Who are you?', "Image"], ['Who ar eyou?', "I apologize for the confusion earlier!\n\nI am LLaMA, an AI assistant developed by Meta AI that can understand and respond to human input in a conversational manner. I'm not a human, but a computer program designed to simulate conversation, answer questions, and even generate text based on the input I receive.\n\nI can assist with a wide range of topics, from general knowledge to entertainment, and even create stories or dialogues. I'm constantly learning and improving my responses based on the interactions I have with users like you.\n\nSo, feel free to ask me anything, and I'll do my best to help!"]], [], [], [], [], [], [], [], [], [], [], []] assert get_gradio_depth(example_list) == 3 example_list = [[[['Dog', "Bad Dog"], ['Who are you?', "Image"], ['Who ar eyou?', "I apologize for the confusion earlier!\n\nI am LLaMA, an AI assistant developed by Meta AI that can understand and respond to human input in a conversational manner. I'm not a human, but a computer program designed to simulate conversation, answer questions, and even generate text based on the input I receive.\n\nI can assist with a wide range of topics, from general knowledge to entertainment, and even create stories or dialogues. I'm constantly learning and improving my responses based on the interactions I have with users like you.\n\nSo, feel free to ask me anything, and I'll do my best to help!"]], [], [], [], [], [], [], [], [], [], [], []]] assert get_gradio_depth(example_list) == 4 example_list = [['Dog', "Bad Dog"], ['Who are you?', "Image"]] assert get_gradio_depth(example_list) == 2 # more cases example_list = [] assert get_gradio_depth(example_list) == 0 example_list = [1, 2, 3] assert get_gradio_depth(example_list) == 1 example_list = [[1], [2], [3]] assert get_gradio_depth(example_list) == 1 example_list = [[[1]], [[2]], [[3]]] assert get_gradio_depth(example_list) == 2 example_list = [[[[1]]], [[[2]]], [[[3]]]] assert get_gradio_depth(example_list) == 3 example_list = [[[[[1]]]], [[[[2]]]], [[[[3]]]]] assert get_gradio_depth(example_list) == 4 example_list = [[], [1], [2, [3]], [[[4]]]] assert get_gradio_depth(example_list) == 3 example_list = [[], [[[[1]]]], [2, [3]], [[[4]]]] assert get_gradio_depth(example_list) == 4 example_list = [[], [[[[[1]]]]], [2, [3]], [[[4]]]] assert get_gradio_depth(example_list) == 5 example_list = [[[[[1]]]], [[[[2]]]], [[[3]]], [[4]], [5]] assert get_gradio_depth(example_list) == 4 example_list = [[[[[1]]]], [[[[2]]]], [[[3]]], [[4]], [5], []] assert get_gradio_depth(example_list) == 4 def test_schema_to_typed(): TEST_SCHEMA = { "type": "object", "properties": { "name": {"type": "string"}, "age": {"type": "integer"}, "skills": { "type": "array", "items": {"type": "string", "maxLength": 10}, "minItems": 3 }, "work history": { "type": "array", "items": { "type": "object", "properties": { "company": {"type": "string"}, "duration": {"type": "string"}, "position": {"type": "string"} }, "required": ["company", "position"] } } }, "required": ["name", "age", "skills", "work history"] } Schema = create_typed_dict(TEST_SCHEMA) # Example usage of the generated TypedDict person: Schema = { "name": "John Doe", "age": 30, "skills": ["Python", "TypeScript", "Docker"], "work history": [ {"company": "TechCorp", "position": "Developer", "duration": "2 years"}, {"company": "DataInc", "position": "Data Scientist"} ] } print(person) def test_genai_schema(): # Usage example TEST_SCHEMA = { "type": "object", "properties": { "name": {"type": "string"}, "age": {"type": "integer"}, "skills": { "type": "array", "items": {"type": "string", "maxLength": 10}, "minItems": 3 }, "work history": { "type": "array", "items": { "type": "object", "properties": { "company": {"type": "string"}, "duration": {"type": "string"}, "position": {"type": "string"} }, "required": ["company", "position"] } }, "status": { "type": "string", "enum": ["active", "inactive", "on leave"] } }, "required": ["name", "age", "skills", "work history", "status"] } from src.utils_langchain import convert_to_genai_schema genai_schema = convert_to_genai_schema(TEST_SCHEMA) # Print the schema (this will show the structure, but not all details) print(genai_schema) # You can now use this schema with the Gemini API # For example: # response = model.generate_content(prompt, response_schema=genai_schema) def test_genai_schema_more(): # Test cases TEST_SCHEMAS = [ # Object schema { "type": "object", "properties": { "name": {"type": "string", "description": "The person's name"}, "age": {"type": "integer", "description": "The person's age"}, "height": {"type": "number", "format": "float", "description": "Height in meters"}, "is_student": {"type": "boolean", "description": "Whether the person is a student"}, "skills": { "type": "array", "items": {"type": "string"}, "description": "List of skills" }, "address": { "type": "object", "properties": { "street": {"type": "string"}, "city": {"type": "string"}, "country": {"type": "string"} }, "required": ["street", "city"], "description": "Address details" }, "status": { "type": "string", "enum": ["active", "inactive", "on leave"], "description": "Current status" } }, "required": ["name", "age", "is_student"], "description": "A person's profile" }, # Array schema { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "integer"}, "name": {"type": "string"} }, "required": ["id"] }, "description": "List of items" }, # String schema { "type": "string", "format": "email", "description": "Email address" }, # Number schema { "type": "number", "format": "double", "description": "A floating-point number" }, # Boolean schema { "type": "boolean", "description": "A true/false value" } ] from src.utils_langchain import convert_to_genai_schema # Test the conversion for i, schema in enumerate(TEST_SCHEMAS, 1): print(f"\nTest Schema {i}:") genai_schema = convert_to_genai_schema(schema) print(genai_schema) def test_pymupdf4llm(): from langchain_community.document_loaders import PyMuPDFLoader from src.utils_langchain import PyMuPDF4LLMLoader times_pymupdf = [] times_pymupdf4llm = [] files = [os.path.join('tests', x) for x in os.listdir('tests')] files += [os.path.join('/home/jon/Downloads/', x) for x in os.listdir('/home/jon/Downloads/')] files = ['/home/jon/Downloads/Tabasco_Ingredients_Products_Guide.pdf'] for file in files: if not file.endswith('.pdf'): continue t0 = time.time() doc = PyMuPDFLoader(file).load() assert doc is not None print('pymupdf: %s %s %s' % (file, len(doc), time.time() - t0)) times_pymupdf.append((time.time() - t0)/len(doc)) for page in doc: print(page) t0 = time.time() doc = PyMuPDF4LLMLoader(file).load() assert doc is not None print('pymupdf4llm: %s %s %s' % (file, len(doc), time.time() - t0)) times_pymupdf4llm.append((time.time() - t0)/len(doc)) for page in doc: print(page) if len(times_pymupdf) > 30: break print("pymupdf stats:") compute_stats(times_pymupdf) print("pymupdf4llm stats:") compute_stats(times_pymupdf4llm) def compute_stats(times_in_seconds): # Compute statistics min_time = min(times_in_seconds) max_time = max(times_in_seconds) average_time = sum(times_in_seconds) / len(times_in_seconds) # Print the results print(f"Min time: {min_time} seconds") print(f"Max time: {max_time} seconds") print(f"Average time: {average_time} seconds")