|
"""
|
|
Helper class to simplify common read-only BigQuery tasks.
|
|
"""
|
|
|
|
|
|
import pandas as pd
|
|
import time
|
|
|
|
from google.cloud import bigquery
|
|
|
|
|
|
class BigQueryHelper(object):
|
|
"""
|
|
Helper class to simplify common BigQuery tasks like executing queries,
|
|
showing table schemas, etc without worrying about table or dataset pointers.
|
|
|
|
See the BigQuery docs for details of the steps this class lets you skip:
|
|
https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/reference.html
|
|
"""
|
|
|
|
def __init__(self, active_project, dataset_name, max_wait_seconds=180):
|
|
self.project_name = active_project
|
|
self.dataset_name = dataset_name
|
|
self.max_wait_seconds = max_wait_seconds
|
|
self.client = bigquery.Client()
|
|
self.__dataset_ref = self.client.dataset(self.dataset_name, project=self.project_name)
|
|
self.dataset = None
|
|
self.tables = dict()
|
|
self.__table_refs = dict()
|
|
self.total_gb_used_net_cache = 0
|
|
self.BYTES_PER_GB = 2**30
|
|
|
|
def __fetch_dataset(self):
|
|
"""
|
|
Lazy loading of dataset. For example,
|
|
if the user only calls `self.query_to_pandas` then the
|
|
dataset never has to be fetched.
|
|
"""
|
|
if self.dataset is None:
|
|
self.dataset = self.client.get_dataset(self.__dataset_ref)
|
|
|
|
def __fetch_table(self, table_name):
|
|
"""
|
|
Lazy loading of table
|
|
"""
|
|
self.__fetch_dataset()
|
|
if table_name not in self.__table_refs:
|
|
self.__table_refs[table_name] = self.dataset.table(table_name)
|
|
if table_name not in self.tables:
|
|
self.tables[table_name] = self.client.get_table(self.__table_refs[table_name])
|
|
|
|
def __handle_record_field(self, row, schema_details, top_level_name=''):
|
|
"""
|
|
Unpack a single row, including any nested fields.
|
|
"""
|
|
name = row['name']
|
|
if top_level_name != '':
|
|
name = top_level_name + '.' + name
|
|
schema_details.append([{
|
|
'name': name,
|
|
'type': row['type'],
|
|
'mode': row['mode'],
|
|
'fields': pd.np.nan,
|
|
'description': row['description']
|
|
}])
|
|
|
|
if type(row.get('fields', 0.0)) == float:
|
|
return None
|
|
for entry in row['fields']:
|
|
self.__handle_record_field(entry, schema_details, name)
|
|
|
|
def __unpack_all_schema_fields(self, schema):
|
|
"""
|
|
Unrolls nested schemas. Returns dataframe with one row per field,
|
|
and the field names in the format accepted by the API.
|
|
Results will look similar to the website schema, such as:
|
|
https://bigquery.cloud.google.com/table/bigquery-public-data:github_repos.commits?pli=1
|
|
|
|
Args:
|
|
schema: DataFrame derived from api repr of raw table.schema
|
|
Returns:
|
|
Dataframe of the unrolled schema.
|
|
"""
|
|
schema_details = []
|
|
schema.apply(lambda row:
|
|
self.__handle_record_field(row, schema_details), axis=1)
|
|
result = pd.concat([pd.DataFrame.from_dict(x) for x in schema_details])
|
|
result.reset_index(drop=True, inplace=True)
|
|
del result['fields']
|
|
return result
|
|
|
|
def table_schema(self, table_name):
|
|
"""
|
|
Get the schema for a specific table from a dataset.
|
|
Unrolls nested field names into the format that can be copied
|
|
directly into queries. For example, for the `github.commits` table,
|
|
the this will return `committer.name`.
|
|
|
|
This is a very different return signature than BigQuery's table.schema.
|
|
"""
|
|
self.__fetch_table(table_name)
|
|
raw_schema = self.tables[table_name].schema
|
|
schema = pd.DataFrame.from_dict([x.to_api_repr() for x in raw_schema])
|
|
|
|
if 'fields' in schema.columns:
|
|
schema = self.__unpack_all_schema_fields(schema)
|
|
|
|
schema = schema[['name', 'type', 'mode', 'description']]
|
|
return schema
|
|
|
|
def list_tables(self):
|
|
"""
|
|
List the names of the tables in a dataset
|
|
"""
|
|
self.__fetch_dataset()
|
|
return([x.table_id for x in self.client.list_tables(self.dataset)])
|
|
|
|
def estimate_query_size(self, query):
|
|
"""
|
|
Estimate gigabytes scanned by query.
|
|
Does not consider if there is a cached query table.
|
|
See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.dryRun
|
|
"""
|
|
my_job_config = bigquery.job.QueryJobConfig()
|
|
my_job_config.dry_run = True
|
|
my_job = self.client.query(query, job_config=my_job_config)
|
|
return my_job.total_bytes_processed / self.BYTES_PER_GB
|
|
|
|
def query_to_pandas(self, query):
|
|
"""
|
|
Execute a SQL query & return a pandas dataframe
|
|
"""
|
|
my_job = self.client.query(query)
|
|
start_time = time.time()
|
|
while not my_job.done():
|
|
if (time.time() - start_time) > self.max_wait_seconds:
|
|
print("Max wait time elapsed, query cancelled.")
|
|
self.client.cancel_job(my_job.job_id)
|
|
return None
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
if my_job.total_bytes_billed:
|
|
self.total_gb_used_net_cache += my_job.total_bytes_billed / self.BYTES_PER_GB
|
|
return my_job.to_dataframe()
|
|
|
|
def query_to_pandas_safe(self, query, max_gb_scanned=1):
|
|
"""
|
|
Execute a query, but only if the query would scan less than `max_gb_scanned` of data.
|
|
"""
|
|
query_size = self.estimate_query_size(query)
|
|
if query_size <= max_gb_scanned:
|
|
return self.query_to_pandas(query)
|
|
msg = "Query cancelled; estimated size of {0} exceeds limit of {1} GB"
|
|
print(msg.format(query_size, max_gb_scanned))
|
|
|
|
def head(self, table_name, num_rows=5, start_index=None, selected_columns=None):
|
|
"""
|
|
Get the first n rows of a table as a DataFrame.
|
|
Does not perform a full table scan; should use a trivial amount of data as long as n is small.
|
|
"""
|
|
self.__fetch_table(table_name)
|
|
active_table = self.tables[table_name]
|
|
schema_subset = None
|
|
if selected_columns:
|
|
schema_subset = [col for col in active_table.schema if col.name in selected_columns]
|
|
results = self.client.list_rows(active_table, selected_fields=schema_subset,
|
|
max_results=num_rows, start_index=start_index)
|
|
results = [x for x in results]
|
|
return pd.DataFrame(
|
|
data=[list(x.values()) for x in results], columns=list(results[0].keys()))
|
|
|