|
import boto3 |
|
import csv |
|
from decimal import Decimal |
|
import datetime |
|
from boto3.dynamodb.conditions import Key |
|
|
|
from tools.config import AWS_REGION, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, OUTPUT_FOLDER |
|
|
|
|
|
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME |
|
REGION = AWS_REGION |
|
CSV_OUTPUT = OUTPUT_FOLDER + 'dynamodb_logs_export.csv' |
|
|
|
|
|
dynamodb = boto3.resource('dynamodb', region_name=REGION) |
|
table = dynamodb.Table(TABLE_NAME) |
|
|
|
|
|
def convert_types(item): |
|
new_item = {} |
|
for key, value in item.items(): |
|
|
|
if isinstance(value, Decimal): |
|
new_item[key] = int(value) if value % 1 == 0 else float(value) |
|
|
|
elif isinstance(value, str): |
|
try: |
|
|
|
|
|
dt_obj = datetime.datetime.fromisoformat(value.replace('Z', '+00:00')) |
|
|
|
new_item[key] = dt_obj.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] |
|
except (ValueError, TypeError): |
|
|
|
new_item[key] = value |
|
|
|
else: |
|
new_item[key] = value |
|
return new_item |
|
|
|
|
|
def scan_table(): |
|
items = [] |
|
response = table.scan() |
|
items.extend(response['Items']) |
|
|
|
while 'LastEvaluatedKey' in response: |
|
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) |
|
items.extend(response['Items']) |
|
|
|
return items |
|
|
|
|
|
|
|
def export_to_csv(items, output_path, fields_to_drop: list = None): |
|
if not items: |
|
print("No items found.") |
|
return |
|
|
|
|
|
drop_set = set(fields_to_drop or []) |
|
|
|
|
|
all_keys = set() |
|
for item in items: |
|
all_keys.update(item.keys()) |
|
|
|
|
|
fieldnames = sorted(list(all_keys - drop_set)) |
|
|
|
print("Final CSV columns will be:", fieldnames) |
|
|
|
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile: |
|
|
|
|
|
writer = csv.DictWriter( |
|
csvfile, |
|
fieldnames=fieldnames, |
|
extrasaction='ignore', |
|
restval='' |
|
) |
|
writer.writeheader() |
|
|
|
for item in items: |
|
|
|
|
|
writer.writerow(convert_types(item)) |
|
|
|
print(f"Exported {len(items)} items to {output_path}") |
|
|
|
|
|
items = scan_table() |
|
export_to_csv(items, CSV_OUTPUT, fields_to_drop=[]) |