Mustehson commited on
Commit
5f9d608
·
1 Parent(s): ded67a4

DLT Pipeline

Browse files
Files changed (2) hide show
  1. app.py +53 -5
  2. requirements.txt +2 -1
app.py CHANGED
@@ -10,6 +10,7 @@ from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace
10
  from langsmith import traceable
11
  from langchain import hub
12
  import warnings
 
13
  warnings.filterwarnings("ignore", category=DeprecationWarning)
14
 
15
  # Height of the Tabs Text Area
@@ -63,9 +64,44 @@ def get_tables_names(schema_name):
63
  def update_table_names(schema_name):
64
  tables = get_tables_names(schema_name)
65
  return gr.update(choices=tables)
66
- def get_data_df(schema):
67
- print('Getting Dataframe from the Database')
68
- return conn.sql(f"SELECT * FROM {schema} LIMIT 1000").df()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
  def df_summary(df):
71
  summary = []
@@ -205,7 +241,14 @@ def statistics(df):
205
  # Main Function
206
  def main(table):
207
  schema = get_table_schema(table)
208
- df = get_data_df(schema)
 
 
 
 
 
 
 
209
  df_statistics, df_alerts = statistics(df)
210
  describe_num, describe_cat = describe(df)
211
 
@@ -226,7 +269,12 @@ def main(table):
226
  def user_results(table, text_query):
227
 
228
  schema = get_table_schema(table)
229
- df = get_data_df(schema)
 
 
 
 
 
230
 
231
  messages = format_user_prompt(df=df, user_description=text_query)
232
  tests = run_llm(messages)
 
10
  from langsmith import traceable
11
  from langchain import hub
12
  import warnings
13
+ import dlt
14
  warnings.filterwarnings("ignore", category=DeprecationWarning)
15
 
16
  # Height of the Tabs Text Area
 
64
  def update_table_names(schema_name):
65
  tables = get_tables_names(schema_name)
66
  return gr.update(choices=tables)
67
+ # def get_data_df(schema):
68
+ # print('Getting Dataframe from the Database')
69
+ # return conn.sql(f"SELECT * FROM {schema} LIMIT 1000")
70
+
71
+ @dlt.resource
72
+ def fetch_data(schema):
73
+ result = conn.sql(f"SELECT * FROM {schema} LIMIT 1000")
74
+
75
+ while True:
76
+ chunk_df = result.fetch_df_chunk(2)
77
+
78
+ if chunk_df is None or len(chunk_df) == 0:
79
+ break
80
+ else:
81
+ yield chunk_df
82
+
83
+ def create_pipeline(schema):
84
+ dataset_name = schema.split('.')[1]
85
+ print("Dataset Name: ", dataset_name)
86
+
87
+ table_name = schema.split('.')[2]
88
+ print("Table Name: ", table_name)
89
+
90
+ pipeline =dlt.pipeline(
91
+ pipeline_name='duckdb_pipeline',
92
+ destination='duckdb',
93
+ dataset_name= dataset_name,
94
+ )
95
+
96
+ load_info = pipeline.run(fetch_data(schema), table_name = table_name,
97
+ write_disposition = "replace")
98
+
99
+ print(load_info)
100
+ return dataset_name + "." + table_name
101
+
102
+ def load_pipeline(table_name):
103
+ _conn = duckdb.connect("duckdb_pipeline.duckdb")
104
+ return _conn.sql(f"SELECT * FROM {table_name} LIMIT 1000").df()
105
 
106
  def df_summary(df):
107
  summary = []
 
241
  # Main Function
242
  def main(table):
243
  schema = get_table_schema(table)
244
+
245
+ # Create dlt pipeline
246
+ table_name = create_pipeline(schema)
247
+
248
+ # Load dlt pipeline
249
+ df = load_pipeline(table_name)
250
+
251
+ # df = get_data_df(schema)
252
  df_statistics, df_alerts = statistics(df)
253
  describe_num, describe_cat = describe(df)
254
 
 
269
  def user_results(table, text_query):
270
 
271
  schema = get_table_schema(table)
272
+
273
+ # Create dlt pipeline
274
+ table_name = create_pipeline(schema)
275
+
276
+ # Load dlt pipeline
277
+ df = load_pipeline(table_name)
278
 
279
  messages = format_user_prompt(df=df, user_description=text_query)
280
  tests = run_llm(messages)
requirements.txt CHANGED
@@ -7,4 +7,5 @@ pandera==0.20.4
7
  ydata-profiling==v4.11.0
8
  langchain-core==0.3.12
9
  langchain-huggingface
10
- langchain==0.3.4
 
 
7
  ydata-profiling==v4.11.0
8
  langchain-core==0.3.12
9
  langchain-huggingface
10
+ langchain==0.3.4
11
+ dlt==1.3.0