File size: 4,768 Bytes
cfd3735
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Test functionality related to combining documents."""

from typing import Any, List

import pytest

from langchain import PromptTemplate
from langchain.chains.combine_documents.base import format_document
from langchain.chains.combine_documents.map_reduce import (
    _collapse_docs,
    _split_list_of_docs,
)
from langchain.docstore.document import Document


def _fake_docs_len_func(docs: List[Document]) -> int:
    return len(_fake_combine_docs_func(docs))


def _fake_combine_docs_func(docs: List[Document], **kwargs: Any) -> str:
    return "".join([d.page_content for d in docs])


def test__split_list_long_single_doc() -> None:
    """Test splitting of a long single doc."""
    docs = [Document(page_content="foo" * 100)]
    with pytest.raises(ValueError):
        _split_list_of_docs(docs, _fake_docs_len_func, 100)


def test__split_list_long_pair_doc() -> None:
    """Test splitting of a list with two medium docs."""
    docs = [Document(page_content="foo" * 30)] * 2
    with pytest.raises(ValueError):
        _split_list_of_docs(docs, _fake_docs_len_func, 100)


def test__split_list_single_doc() -> None:
    """Test splitting works with just a single doc."""
    docs = [Document(page_content="foo")]
    doc_list = _split_list_of_docs(docs, _fake_docs_len_func, 100)
    assert doc_list == [docs]


def test__split_list_double_doc() -> None:
    """Test splitting works with just two docs."""
    docs = [Document(page_content="foo"), Document(page_content="bar")]
    doc_list = _split_list_of_docs(docs, _fake_docs_len_func, 100)
    assert doc_list == [docs]


def test__split_list_works_correctly() -> None:
    """Test splitting works correctly."""
    docs = [
        Document(page_content="foo"),
        Document(page_content="bar"),
        Document(page_content="baz"),
        Document(page_content="foo" * 2),
        Document(page_content="bar"),
        Document(page_content="baz"),
    ]
    doc_list = _split_list_of_docs(docs, _fake_docs_len_func, 10)
    expected_result = [
        # Test a group of three.
        [
            Document(page_content="foo"),
            Document(page_content="bar"),
            Document(page_content="baz"),
        ],
        # Test a group of two, where one is bigger.
        [Document(page_content="foo" * 2), Document(page_content="bar")],
        # Test no errors on last
        [Document(page_content="baz")],
    ]
    assert doc_list == expected_result


def test__collapse_docs_no_metadata() -> None:
    """Test collapse documents functionality when no metadata."""
    docs = [
        Document(page_content="foo"),
        Document(page_content="bar"),
        Document(page_content="baz"),
    ]
    output = _collapse_docs(docs, _fake_combine_docs_func)
    expected_output = Document(page_content="foobarbaz")
    assert output == expected_output


def test__collapse_docs_one_doc() -> None:
    """Test collapse documents functionality when only one document present."""
    # Test with no metadata.
    docs = [Document(page_content="foo")]
    output = _collapse_docs(docs, _fake_combine_docs_func)
    assert output == docs[0]

    # Test with metadata.
    docs = [Document(page_content="foo", metadata={"source": "a"})]
    output = _collapse_docs(docs, _fake_combine_docs_func)
    assert output == docs[0]


def test__collapse_docs_metadata() -> None:
    """Test collapse documents functionality when metadata exists."""
    metadata1 = {"source": "a", "foo": 2, "bar": "1", "extra1": "foo"}
    metadata2 = {"source": "b", "foo": "3", "bar": 2, "extra2": "bar"}
    docs = [
        Document(page_content="foo", metadata=metadata1),
        Document(page_content="bar", metadata=metadata2),
    ]
    output = _collapse_docs(docs, _fake_combine_docs_func)
    expected_metadata = {
        "source": "a, b",
        "foo": "2, 3",
        "bar": "1, 2",
        "extra1": "foo",
        "extra2": "bar",
    }
    expected_output = Document(page_content="foobar", metadata=expected_metadata)
    assert output == expected_output


def test_format_doc_with_metadata() -> None:
    """Test format doc on a valid document."""
    doc = Document(page_content="foo", metadata={"bar": "baz"})
    prompt = PromptTemplate(
        input_variables=["page_content", "bar"], template="{page_content}, {bar}"
    )
    expected_output = "foo, baz"
    output = format_document(doc, prompt)
    assert output == expected_output


def test_format_doc_missing_metadata() -> None:
    """Test format doc on a document with missing metadata."""
    doc = Document(page_content="foo")
    prompt = PromptTemplate(
        input_variables=["page_content", "bar"], template="{page_content}, {bar}"
    )
    with pytest.raises(ValueError):
        format_document(doc, prompt)