Spaces:
Sleeping
Sleeping
File size: 5,301 Bytes
0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e 82815c9 0af2a8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
import ast
import base64
import duckdb
import json
import re
import textwrap
from ulid import ULID
HISTORY_FILE = "history.json"
class SQLError(Exception):
pass
class NotFoundError(Exception):
pass
class Q(str):
UNSAFE = ["CREATE", "DELETE", "DROP", "INSERT", "UPDATE"]
rows=None
def __new__(cls, template: str, **kwargs):
"""Create a new Q-string."""
_template = textwrap.dedent(template).strip()
try:
instance = str.__new__(cls, _template.format(**kwargs))
except KeyError:
instance = str.__new__(cls, _template)
instance.id = str(ULID())
instance.alias = kwargs.pop("alias") if kwargs.get("alias") else None
instance.template = _template
instance.kwargs = kwargs
instance.definitions = "\n".join([f"{k} = {repr(v)}" for k, v in kwargs.items()])
for attr in ("rows", "cols", "source_id", "start", "end"):
setattr(instance, attr, None)
return instance
def __repr__(self):
"""Neat repr for inspecting Q objects."""
strings = []
for k, v in self.__dict__.items():
value_repr = "\n" + textwrap.indent(v, " ") if "\n" in str(v) else v
strings.append(f"{k}: {value_repr}")
return "\n".join(strings)
def run(self, sql_engine=None, save=False, _raise=False):
self.start = ULID()
try:
if sql_engine is None:
res = self.run_duckdb()
else:
res = self.run_sql(sql_engine)
self.rows, self.cols = res.shape
return res
except Exception as e:
if _raise:
raise e
return str(e)
finally:
self.end = ULID()
if save:
self.save()
def run_duckdb(self):
return duckdb.sql(self)
def df(self, sql_engine=None, save=False, _raise=False):
res = self.run(sql_engine=sql_engine, save=save, _raise=_raise)
if not getattr(self, "rows", None):
return
else:
result_df = res.df()
result_df.q = self
return result_df
def save(self, file=HISTORY_FILE):
with open(file, "a") as f:
f.write(self.json)
f.write("\n")
@property
def json(self):
serialized = {"id": self.id, "q": self}
serialized.update(self.__dict__)
return json.dumps(serialized, default=lambda x: x.datetime.strftime("%F %T.%f")[:-3])
@property
def is_safe(self):
return not any(cmd in self.template.upper() for cmd in self.UNSAFE)
@classmethod
def from_dict(cls, query_dict: dict):
q = query_dict.pop("q")
return cls(q, **query_dict)
@classmethod
def from_template_and_definitions(cls, template: str, definitions: str, alias: str|None = None):
query_dict = {"q": template, "alias": alias}
query_dict.update(parse_definitions(definitions))
instance = Q.from_dict(query_dict)
instance.definitions = definitions
return instance
@classmethod
def from_history(cls, query_id=None, alias=None):
search_query = Q(f"""
SELECT id, template, kwargs
FROM '{HISTORY_FILE}'
WHERE id='{query_id}' OR alias='{alias}'
LIMIT 1
""")
query = search_query.run()
if search_query.rows == 1:
source_id, template, kwargs = query.fetchall()[0]
kwargs = {k: v for k, v in kwargs.items() if v is not None}
instance = cls(template, **kwargs)
instance.source_id = source_id
return instance
elif search_query.rows == 0:
raise NotFoundError(f"id '{query_id}' / alias '{alias}' not found")
else:
raise SQLError(query)
# @property
# def definitions(self):
# return "\n".join([""]+[f"{k} = {v}" for k, v in self.kwargs.items()])
@property
def base64(self):
return base64.b64encode(self.encode()).decode()
@classmethod
def from_base64(cls, b64):
"""Initializing from base64-encoded URL paths."""
return cls(base64.b64decode(b64).decode())
def parse_definitions(definitions) -> dict:
"""Parse a string literal of "key=value" pairs, one per line, into kwargs."""
kwargs = {}
lines = definitions.split("\n")
for _line in lines:
line = re.sub("\s+", "", _line)
if line == "" or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", maxsplit=1)
kwargs[key] = ast.literal_eval(value)
return kwargs
EX1 = Q.from_template_and_definitions(
template="SELECT {x} AS {colname}",
definitions="\n".join([
"# Define variables: one '=' per line",
"x=42",
"colname='answer'",
]),
alias="example1",
)
EX2 = Q(
"""
SELECT
Symbol,
Number,
Mass,
Abundance
FROM '{url}'
""",
url="https://raw.githubusercontent.com/ekwan/cctk/master/cctk/data/isotopes.csv",
alias="example2",
)
EX3 = Q(
"""
SELECT /
FROM 'history.json'
ORDER BY id DESC
""",
alias="example3",
)
|