document_redaction / load_dynamo_logs.py
seanpedrickcase's picture
Fixed on deprecated Github workflow functions. Applied linter and formatter to code throughout. Added tests for GUI load.
bafcf39
raw
history blame
3.07 kB
import csv
import datetime
from decimal import Decimal
import boto3
from tools.config import (
AWS_REGION,
OUTPUT_FOLDER,
USAGE_LOG_DYNAMODB_TABLE_NAME,
)
# Replace with your actual table name and region
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate
REGION = AWS_REGION
CSV_OUTPUT = OUTPUT_FOLDER + "dynamodb_logs_export.csv"
# Create DynamoDB resource
dynamodb = boto3.resource("dynamodb", region_name=REGION)
table = dynamodb.Table(TABLE_NAME)
# Helper function to convert Decimal to float or int
def convert_types(item):
new_item = {}
for key, value in item.items():
# Handle Decimals first
if isinstance(value, Decimal):
new_item[key] = int(value) if value % 1 == 0 else float(value)
# Handle Strings that might be dates
elif isinstance(value, str):
try:
# Attempt to parse a common ISO 8601 format.
# The .replace() handles the 'Z' for Zulu/UTC time.
dt_obj = datetime.datetime.fromisoformat(value.replace("Z", "+00:00"))
# Now that we have a datetime object, format it as desired
new_item[key] = dt_obj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
except (ValueError, TypeError):
# If it fails to parse, it's just a regular string
new_item[key] = value
# Handle all other types
else:
new_item[key] = value
return new_item
# Paginated scan
def scan_table():
items = []
response = table.scan()
items.extend(response["Items"])
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
items.extend(response["Items"])
return items
# Export to CSV
# Export to CSV
def export_to_csv(items, output_path, fields_to_drop: list = None):
if not items:
print("No items found.")
return
# Use a set for efficient lookup
drop_set = set(fields_to_drop or [])
# Get a comprehensive list of all possible headers from all items
all_keys = set()
for item in items:
all_keys.update(item.keys())
# Determine the final fieldnames by subtracting the ones to drop
fieldnames = sorted(list(all_keys - drop_set))
print("Final CSV columns will be:", fieldnames)
with open(output_path, "w", newline="", encoding="utf-8-sig") as csvfile:
# The key fix is here: extrasaction='ignore'
# restval='' is also good practice to handle rows that are missing a key
writer = csv.DictWriter(
csvfile, fieldnames=fieldnames, extrasaction="ignore", restval=""
)
writer.writeheader()
for item in items:
# The convert_types function can now return the full dict,
# and the writer will simply ignore the extra fields.
writer.writerow(convert_types(item))
print(f"Exported {len(items)} items to {output_path}")
# Run export
items = scan_table()
export_to_csv(items, CSV_OUTPUT, fields_to_drop=[])