document_redaction / load_dynamo_logs.py
seanpedrickcase's picture
Added support for other languages. Improved DynamoDB download
9ae09da
raw
history blame
3.21 kB
import boto3
import csv
from decimal import Decimal
import datetime
from boto3.dynamodb.conditions import Key
from tools.config import AWS_REGION, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, OUTPUT_FOLDER
# Replace with your actual table name and region
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate
REGION = AWS_REGION
CSV_OUTPUT = OUTPUT_FOLDER + 'dynamodb_logs_export.csv'
# Create DynamoDB resource
dynamodb = boto3.resource('dynamodb', region_name=REGION)
table = dynamodb.Table(TABLE_NAME)
# Helper function to convert Decimal to float or int
def convert_types(item):
new_item = {}
for key, value in item.items():
# Handle Decimals first
if isinstance(value, Decimal):
new_item[key] = int(value) if value % 1 == 0 else float(value)
# Handle Strings that might be dates
elif isinstance(value, str):
try:
# Attempt to parse a common ISO 8601 format.
# The .replace() handles the 'Z' for Zulu/UTC time.
dt_obj = datetime.datetime.fromisoformat(value.replace('Z', '+00:00'))
# Now that we have a datetime object, format it as desired
new_item[key] = dt_obj.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
except (ValueError, TypeError):
# If it fails to parse, it's just a regular string
new_item[key] = value
# Handle all other types
else:
new_item[key] = value
return new_item
# Paginated scan
def scan_table():
items = []
response = table.scan()
items.extend(response['Items'])
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items.extend(response['Items'])
return items
# Export to CSV
# Export to CSV
def export_to_csv(items, output_path, fields_to_drop: list = None):
if not items:
print("No items found.")
return
# Use a set for efficient lookup
drop_set = set(fields_to_drop or [])
# Get a comprehensive list of all possible headers from all items
all_keys = set()
for item in items:
all_keys.update(item.keys())
# Determine the final fieldnames by subtracting the ones to drop
fieldnames = sorted(list(all_keys - drop_set))
print("Final CSV columns will be:", fieldnames)
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
# The key fix is here: extrasaction='ignore'
# restval='' is also good practice to handle rows that are missing a key
writer = csv.DictWriter(
csvfile,
fieldnames=fieldnames,
extrasaction='ignore',
restval=''
)
writer.writeheader()
for item in items:
# The convert_types function can now return the full dict,
# and the writer will simply ignore the extra fields.
writer.writerow(convert_types(item))
print(f"Exported {len(items)} items to {output_path}")
# Run export
items = scan_table()
export_to_csv(items, CSV_OUTPUT, fields_to_drop=[])