Fixed on deprecated Github workflow functions. Applied linter and formatter to code throughout. Added tests for GUI load.
bafcf39
import csv | |
import datetime | |
from decimal import Decimal | |
import boto3 | |
from tools.config import ( | |
AWS_REGION, | |
OUTPUT_FOLDER, | |
USAGE_LOG_DYNAMODB_TABLE_NAME, | |
) | |
# Replace with your actual table name and region | |
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate | |
REGION = AWS_REGION | |
CSV_OUTPUT = OUTPUT_FOLDER + "dynamodb_logs_export.csv" | |
# Create DynamoDB resource | |
dynamodb = boto3.resource("dynamodb", region_name=REGION) | |
table = dynamodb.Table(TABLE_NAME) | |
# Helper function to convert Decimal to float or int | |
def convert_types(item): | |
new_item = {} | |
for key, value in item.items(): | |
# Handle Decimals first | |
if isinstance(value, Decimal): | |
new_item[key] = int(value) if value % 1 == 0 else float(value) | |
# Handle Strings that might be dates | |
elif isinstance(value, str): | |
try: | |
# Attempt to parse a common ISO 8601 format. | |
# The .replace() handles the 'Z' for Zulu/UTC time. | |
dt_obj = datetime.datetime.fromisoformat(value.replace("Z", "+00:00")) | |
# Now that we have a datetime object, format it as desired | |
new_item[key] = dt_obj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] | |
except (ValueError, TypeError): | |
# If it fails to parse, it's just a regular string | |
new_item[key] = value | |
# Handle all other types | |
else: | |
new_item[key] = value | |
return new_item | |
# Paginated scan | |
def scan_table(): | |
items = [] | |
response = table.scan() | |
items.extend(response["Items"]) | |
while "LastEvaluatedKey" in response: | |
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"]) | |
items.extend(response["Items"]) | |
return items | |
# Export to CSV | |
# Export to CSV | |
def export_to_csv(items, output_path, fields_to_drop: list = None): | |
if not items: | |
print("No items found.") | |
return | |
# Use a set for efficient lookup | |
drop_set = set(fields_to_drop or []) | |
# Get a comprehensive list of all possible headers from all items | |
all_keys = set() | |
for item in items: | |
all_keys.update(item.keys()) | |
# Determine the final fieldnames by subtracting the ones to drop | |
fieldnames = sorted(list(all_keys - drop_set)) | |
print("Final CSV columns will be:", fieldnames) | |
with open(output_path, "w", newline="", encoding="utf-8-sig") as csvfile: | |
# The key fix is here: extrasaction='ignore' | |
# restval='' is also good practice to handle rows that are missing a key | |
writer = csv.DictWriter( | |
csvfile, fieldnames=fieldnames, extrasaction="ignore", restval="" | |
) | |
writer.writeheader() | |
for item in items: | |
# The convert_types function can now return the full dict, | |
# and the writer will simply ignore the extra fields. | |
writer.writerow(convert_types(item)) | |
print(f"Exported {len(items)} items to {output_path}") | |
# Run export | |
items = scan_table() | |
export_to_csv(items, CSV_OUTPUT, fields_to_drop=[]) | |