File size: 7,881 Bytes
fe5c39d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""RAG pipeline"""

import asyncio

from pydantic import BaseModel

from metagpt.const import DATA_PATH, EXAMPLE_DATA_PATH
from metagpt.logs import logger
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import (
    ChromaIndexConfig,
    ChromaRetrieverConfig,
    ElasticsearchIndexConfig,
    ElasticsearchRetrieverConfig,
    ElasticsearchStoreConfig,
    FAISSRetrieverConfig,
    LLMRankerConfig,
)
from metagpt.utils.exceptions import handle_exception

LLM_TIP = "If you not sure, just answer I don't know."

DOC_PATH = EXAMPLE_DATA_PATH / "rag/writer.txt"
QUESTION = f"What are key qualities to be a good writer? {LLM_TIP}"

TRAVEL_DOC_PATH = EXAMPLE_DATA_PATH / "rag/travel.txt"
TRAVEL_QUESTION = f"What does Bob like? {LLM_TIP}"


class Player(BaseModel):
    """To demonstrate rag add objs."""

    name: str = ""
    goal: str = "Win The 100-meter Sprint."
    tool: str = "Red Bull Energy Drink."

    def rag_key(self) -> str:
        """For search"""
        return self.goal


class RAGExample:
    """Show how to use RAG."""

    def __init__(self, engine: SimpleEngine = None, use_llm_ranker: bool = True):
        self._engine = engine
        self._use_llm_ranker = use_llm_ranker

    @property
    def engine(self):
        if not self._engine:
            ranker_configs = [LLMRankerConfig()] if self._use_llm_ranker else None

            self._engine = SimpleEngine.from_docs(
                input_files=[DOC_PATH],
                retriever_configs=[FAISSRetrieverConfig()],
                ranker_configs=ranker_configs,
            )
        return self._engine

    @engine.setter
    def engine(self, value: SimpleEngine):
        self._engine = value

    @handle_exception
    async def run_pipeline(self, question=QUESTION, print_title=True):
        """This example run rag pipeline, use faiss retriever and llm ranker, will print something like:

        Retrieve Result:
        0. Productivi..., 10.0
        1. I wrote cu..., 7.0
        2. I highly r..., 5.0

        Query Result:
        Passion, adaptability, open-mindedness, creativity, discipline, and empathy are key qualities to be a good writer.
        """
        if print_title:
            self._print_title("Run Pipeline")

        nodes = await self.engine.aretrieve(question)
        self._print_retrieve_result(nodes)

        answer = await self.engine.aquery(question)
        self._print_query_result(answer)

    @handle_exception
    async def add_docs(self):
        """This example show how to add docs.

        Before add docs llm anwser I don't know.
        After add docs llm give the correct answer, will print something like:

        [Before add docs]
        Retrieve Result:

        Query Result:
        Empty Response

        [After add docs]
        Retrieve Result:
        0. Bob like..., 10.0

        Query Result:
        Bob likes traveling.
        """
        self._print_title("Add Docs")

        travel_question = f"{TRAVEL_QUESTION}"
        travel_filepath = TRAVEL_DOC_PATH

        logger.info("[Before add docs]")
        await self.run_pipeline(question=travel_question, print_title=False)

        logger.info("[After add docs]")
        self.engine.add_docs([travel_filepath])
        await self.run_pipeline(question=travel_question, print_title=False)

    @handle_exception
    async def add_objects(self, print_title=True):
        """This example show how to add objects.

        Before add docs, engine retrieve nothing.
        After add objects, engine give the correct answer, will print something like:

        [Before add objs]
        Retrieve Result:

        [After add objs]
        Retrieve Result:
        0. 100m Sprin..., 10.0

        [Object Detail]
        {'name': 'Mike', 'goal': 'Win The 100-meter Sprint', 'tool': 'Red Bull Energy Drink'}
        """
        if print_title:
            self._print_title("Add Objects")

        player = Player(name="Mike")
        question = f"{player.rag_key()}"

        logger.info("[Before add objs]")
        await self._retrieve_and_print(question)

        logger.info("[After add objs]")
        self.engine.add_objs([player])

        try:
            nodes = await self._retrieve_and_print(question)

            logger.info("[Object Detail]")
            player: Player = nodes[0].metadata["obj"]
            logger.info(player.name)
        except Exception as e:
            logger.error(f"nodes is empty, llm don't answer correctly, exception: {e}")

    @handle_exception
    async def init_objects(self):
        """This example show how to from objs, will print something like:

        Same as add_objects.
        """
        self._print_title("Init Objects")

        pre_engine = self.engine
        self.engine = SimpleEngine.from_objs(retriever_configs=[FAISSRetrieverConfig()])
        await self.add_objects(print_title=False)
        self.engine = pre_engine

    @handle_exception
    async def init_and_query_chromadb(self):
        """This example show how to use chromadb. how to save and load index. will print something like:

        Query Result:
        Bob likes traveling.
        """
        self._print_title("Init And Query ChromaDB")

        # 1. save index
        output_dir = DATA_PATH / "rag"
        SimpleEngine.from_docs(
            input_files=[TRAVEL_DOC_PATH],
            retriever_configs=[ChromaRetrieverConfig(persist_path=output_dir)],
        )

        # 2. load index
        engine = SimpleEngine.from_index(index_config=ChromaIndexConfig(persist_path=output_dir))

        # 3. query
        answer = await engine.aquery(TRAVEL_QUESTION)
        self._print_query_result(answer)

    @handle_exception
    async def init_and_query_es(self):
        """This example show how to use es. how to save and load index. will print something like:

        Query Result:
        Bob likes traveling.
        """
        self._print_title("Init And Query Elasticsearch")

        # 1. create es index and save docs
        store_config = ElasticsearchStoreConfig(index_name="travel", es_url="http://127.0.0.1:9200")
        engine = SimpleEngine.from_docs(
            input_files=[TRAVEL_DOC_PATH],
            retriever_configs=[ElasticsearchRetrieverConfig(store_config=store_config)],
        )

        # 2. load index
        engine = SimpleEngine.from_index(index_config=ElasticsearchIndexConfig(store_config=store_config))

        # 3. query
        answer = await engine.aquery(TRAVEL_QUESTION)
        self._print_query_result(answer)

    @staticmethod
    def _print_title(title):
        logger.info(f"{'#'*30} {title} {'#'*30}")

    @staticmethod
    def _print_retrieve_result(result):
        """Print retrieve result."""
        logger.info("Retrieve Result:")

        for i, node in enumerate(result):
            logger.info(f"{i}. {node.text[:10]}..., {node.score}")

        logger.info("")

    @staticmethod
    def _print_query_result(result):
        """Print query result."""
        logger.info("Query Result:")

        logger.info(f"{result}\n")

    async def _retrieve_and_print(self, question):
        nodes = await self.engine.aretrieve(question)
        self._print_retrieve_result(nodes)
        return nodes


async def main():
    """RAG pipeline.

    Note:
    1. If `use_llm_ranker` is True, then it will use LLM Reranker to get better result, but it is not always guaranteed that the output will be parseable for reranking,
       prefer `gpt-4-turbo`, otherwise might encounter `IndexError: list index out of range` or `ValueError: invalid literal for int() with base 10`.
    """
    e = RAGExample(use_llm_ranker=False)

    await e.run_pipeline()
    await e.add_docs()
    await e.add_objects()
    await e.init_objects()
    await e.init_and_query_chromadb()
    await e.init_and_query_es()


if __name__ == "__main__":
    asyncio.run(main())