import boto3 from botocore.exceptions import ClientError import json import os import pandas as pd import ipaddress from constructs import Construct from dotenv import set_key from typing import List, Tuple, Optional, Dict, Any from aws_cdk import ( App, CfnTag, aws_ec2 as ec2, aws_wafv2 as wafv2, aws_elasticloadbalancingv2 as elb, aws_elasticloadbalancingv2_actions as elb_act, aws_certificatemanager as acm, # You might need this if you were looking up a cert, but not strictly for ARN aws_cognito as cognito, aws_iam as iam, CfnOutput, Tags ) from cdk_config import PUBLIC_SUBNETS_TO_USE, PRIVATE_SUBNETS_TO_USE, PUBLIC_SUBNET_CIDR_BLOCKS, PRIVATE_SUBNET_CIDR_BLOCKS, PUBLIC_SUBNET_AVAILABILITY_ZONES, PRIVATE_SUBNET_AVAILABILITY_ZONES, POLICY_FILE_LOCATIONS, NAT_GATEWAY_EIP_NAME, S3_LOG_CONFIG_BUCKET_NAME, S3_OUTPUT_BUCKET_NAME, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, AWS_REGION # --- Function to load context from file --- def load_context_from_file(app: App, file_path: str): if os.path.exists(file_path): with open(file_path, 'r') as f: context_data = json.load(f) for key, value in context_data.items(): app.node.set_context(key, value) print(f"Loaded context from {file_path}") else: print(f"Context file not found: {file_path}") # --- Helper to parse environment variables into lists --- def _get_env_list(env_var_name: str) -> List[str]: """Parses a comma-separated environment variable into a list of strings.""" value = env_var_name[1:-1].strip().replace('\"', '').replace("\'","") if not value: return [] # Split by comma and filter out any empty strings that might result from extra commas return [s.strip() for s in value.split(',') if s.strip()] # 1. Try to load CIDR/AZs from environment variables if PUBLIC_SUBNETS_TO_USE: PUBLIC_SUBNETS_TO_USE = _get_env_list(PUBLIC_SUBNETS_TO_USE) if PRIVATE_SUBNETS_TO_USE: PRIVATE_SUBNETS_TO_USE = _get_env_list(PRIVATE_SUBNETS_TO_USE) if PUBLIC_SUBNET_CIDR_BLOCKS: PUBLIC_SUBNET_CIDR_BLOCKS = _get_env_list("PUBLIC_SUBNET_CIDR_BLOCKS") if PUBLIC_SUBNET_AVAILABILITY_ZONES: PUBLIC_SUBNET_AVAILABILITY_ZONES = _get_env_list("PUBLIC_SUBNET_AVAILABILITY_ZONES") if PRIVATE_SUBNET_CIDR_BLOCKS: PRIVATE_SUBNET_CIDR_BLOCKS = _get_env_list("PRIVATE_SUBNET_CIDR_BLOCKS") if PRIVATE_SUBNET_AVAILABILITY_ZONES: PRIVATE_SUBNET_AVAILABILITY_ZONES = _get_env_list("PRIVATE_SUBNET_AVAILABILITY_ZONES") if POLICY_FILE_LOCATIONS: POLICY_FILE_LOCATIONS = _get_env_list(POLICY_FILE_LOCATIONS) def check_for_existing_role(role_name:str): try: iam = boto3.client('iam') #iam.get_role(RoleName=role_name) response = iam.get_role(RoleName=role_name) role = response['Role']['Arn'] print("Response Role:", role) return True, role, "" except iam.exceptions.NoSuchEntityException: return False, "", "" except Exception as e: raise Exception("Getting information on IAM role failed due to:", e) import json from typing import List, Dict, Any, Union, Optional from aws_cdk import ( aws_iam as iam, ) from constructs import Construct # Assume POLICY_FILE_LOCATIONS is defined globally or passed as a default # For example: # POLICY_FILE_LOCATIONS = ["./policies/my_read_policy.json", "./policies/my_write_policy.json"] def add_statement_to_policy(role: iam.IRole, policy_document: Dict[str, Any]): """ Adds individual policy statements from a parsed policy document to a CDK Role. Args: role: The CDK Role construct to attach policies to. policy_document: A Python dictionary representing an IAM policy document. """ # Ensure the loaded JSON is a valid policy document structure if 'Statement' not in policy_document or not isinstance(policy_document['Statement'], list): print(f"Warning: Policy document does not contain a 'Statement' list. Skipping.") return # Do not return role, just log and exit for statement_dict in policy_document['Statement']: try: # Create a CDK PolicyStatement from the dictionary cdk_policy_statement = iam.PolicyStatement.from_json(statement_dict) # Add the policy statement to the role role.add_to_policy(cdk_policy_statement) print(f" - Added statement: {statement_dict.get('Sid', 'No Sid')}") except Exception as e: print(f"Warning: Could not process policy statement: {statement_dict}. Error: {e}") def add_custom_policies( scope: Construct, # Not strictly used here, but good practice if you expand to ManagedPolicies role: iam.IRole, policy_file_locations: Optional[List[str]] = None, custom_policy_text: Optional[str] = None ) -> iam.IRole: """ Loads custom policies from JSON files or a string and attaches them to a CDK Role. Args: scope: The scope in which to define constructs (if needed, e.g., for iam.ManagedPolicy). role: The CDK Role construct to attach policies to. policy_file_locations: List of file paths to JSON policy documents. custom_policy_text: A JSON string representing a policy document. Returns: The modified CDK Role construct. """ if policy_file_locations is None: policy_file_locations = [] current_source = "unknown source" # For error messages try: if policy_file_locations: print(f"Attempting to add policies from files to role {role.node.id}...") for path in policy_file_locations: current_source = f"file: {path}" try: with open(path, 'r') as f: policy_document = json.load(f) print(f"Processing policy from {current_source}...") add_statement_to_policy(role, policy_document) except FileNotFoundError: print(f"Warning: Policy file not found at {path}. Skipping.") except json.JSONDecodeError as e: print(f"Warning: Invalid JSON in policy file {path}: {e}. Skipping.") except Exception as e: print(f"An unexpected error occurred processing policy from {path}: {e}. Skipping.") if custom_policy_text: current_source = "custom policy text string" print(f"Attempting to add policy from custom text to role {role.node.id}...") try: # *** FIX: Parse the JSON string into a Python dictionary *** policy_document = json.loads(custom_policy_text) print(f"Processing policy from {current_source}...") add_statement_to_policy(role, policy_document) except json.JSONDecodeError as e: print(f"Warning: Invalid JSON in custom_policy_text: {e}. Skipping.") except Exception as e: print(f"An unexpected error occurred processing policy from custom_policy_text: {e}. Skipping.") # You might want a final success message, but individual processing messages are also good. print(f"Finished processing custom policies for role {role.node.id}.") except Exception as e: print(f"An unhandled error occurred during policy addition for {current_source}: {e}") return role # Import the S3 Bucket class if you intend to return a CDK object later # from aws_cdk import aws_s3 as s3 def check_s3_bucket_exists(bucket_name: str): # Return type hint depends on what you return """ Checks if an S3 bucket with the given name exists and is accessible. Args: bucket_name: The name of the S3 bucket to check. Returns: A tuple: (bool indicating existence, optional S3 Bucket object or None) Note: Returning a Boto3 S3 Bucket object from here is NOT ideal for direct use in CDK. You'll likely only need the boolean result or the bucket name for CDK lookups/creations. For this example, let's return the boolean and the name. """ s3_client = boto3.client('s3') try: # Use head_bucket to check for existence and access s3_client.head_bucket(Bucket=bucket_name) print(f"Bucket '{bucket_name}' exists and is accessible.") return True, bucket_name # Return True and the bucket name except ClientError as e: # If a ClientError occurs, check the error code. # '404' means the bucket does not exist. # '403' means the bucket exists but you don't have permission. error_code = e.response['Error']['Code'] if error_code == '404': print(f"Bucket '{bucket_name}' does not exist.") return False, None elif error_code == '403': # The bucket exists, but you can't access it. # Depending on your requirements, this might be treated as "exists" # or "not accessible for our purpose". For checking existence, # we'll say it exists here, but note the permission issue. # NOTE - when I tested this, it was returning 403 even for buckets that don't exist. So I will return False instead print(f"Bucket '{bucket_name}' returned 403, which indicates it may exist but is not accessible due to permissions, or that it doesn't exist. Returning False for existence just in case.") return False, bucket_name # It exists, even if not accessible else: # For other errors, it's better to raise the exception # to indicate something unexpected happened. print(f"An unexpected AWS ClientError occurred checking bucket '{bucket_name}': {e}") # Decide how to handle other errors - raising might be safer raise # Re-raise the original exception except Exception as e: print(f"An unexpected non-ClientError occurred checking bucket '{bucket_name}': {e}") # Decide how to handle other errors raise # Re-raise the original exception # Example usage in your check_resources.py: # exists, bucket_name_if_exists = check_s3_bucket_exists(log_bucket_name) # context_data[f"exists:{log_bucket_name}"] = exists # # You don't necessarily need to store the name in context if using from_bucket_name # Delete an S3 bucket def delete_s3_bucket(bucket_name:str): s3 = boto3.client('s3') try: # List and delete all objects response = s3.list_object_versions(Bucket=bucket_name) versions = response.get('Versions', []) + response.get('DeleteMarkers', []) for version in versions: s3.delete_object(Bucket=bucket_name, Key=version['Key'], VersionId=version['VersionId']) # Delete the bucket s3.delete_bucket(Bucket=bucket_name) return {'Status': 'SUCCESS'} except Exception as e: return {'Status': 'FAILED', 'Reason': str(e)} # Function to get subnet ID from subnet name def get_subnet_id(vpc:str, ec2_client:str, subnet_name:str): response = ec2_client.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc.vpc_id]}]) for subnet in response['Subnets']: if subnet['Tags'] and any(tag['Key'] == 'Name' and tag['Value'] == subnet_name for tag in subnet['Tags']): return subnet['SubnetId'] return None def check_ecr_repo_exists(repo_name: str) -> tuple[bool, dict]: """ Checks if an ECR repository with the given name exists. Args: repo_name: The name of the ECR repository to check. Returns: True if the repository exists, False otherwise. """ ecr_client = boto3.client('ecr') try: print("ecr repo_name to check:", repo_name) response = ecr_client.describe_repositories(repositoryNames=[repo_name]) # If describe_repositories succeeds and returns a list of repositories, # and the list is not empty, the repository exists. return len(response['repositories']) > 0, response['repositories'][0] except ClientError as e: # Check for the specific error code indicating the repository doesn't exist if e.response['Error']['Code'] == 'RepositoryNotFoundException': return False, {} else: # Re-raise other exceptions to handle unexpected errors raise except Exception as e: print(f"An unexpected error occurred: {e}") return False, {} def check_codebuild_project_exists(project_name: str): # Adjust return type hint as needed """ Checks if a CodeBuild project with the given name exists. Args: project_name: The name of the CodeBuild project to check. Returns: A tuple: - The first element is True if the project exists, False otherwise. - The second element is the project object (dictionary) if found, None otherwise. """ codebuild_client = boto3.client('codebuild') try: # Use batch_get_projects with a list containing the single project name response = codebuild_client.batch_get_projects(names=[project_name]) # The response for batch_get_projects includes 'projects' (found) # and 'projectsNotFound' (not found). if response['projects']: # If the project is found in the 'projects' list print(f"CodeBuild project '{project_name}' found.") return True, response['projects'][0]['arn'] # Return True and the project details dict elif response['projectsNotFound'] and project_name in response['projectsNotFound']: # If the project name is explicitly in the 'projectsNotFound' list print(f"CodeBuild project '{project_name}' not found.") return False, None else: # This case is less expected for a single name lookup, # but could happen if there's an internal issue or the response # structure is slightly different than expected for an error. # It's safer to assume it wasn't found if not in 'projects'. print(f"CodeBuild project '{project_name}' not found (not in 'projects' list).") return False, None except ClientError as e: # Catch specific ClientErrors. batch_get_projects might not throw # 'InvalidInputException' for a non-existent project name if the # name format is valid. It typically just lists it in projectsNotFound. # However, other ClientErrors are possible (e.g., permissions). print(f"An AWS ClientError occurred checking CodeBuild project '{project_name}': {e}") # Decide how to handle other ClientErrors - raising might be safer raise # Re-raise the original exception except Exception as e: print(f"An unexpected non-ClientError occurred checking CodeBuild project '{project_name}': {e}") # Decide how to handle other errors raise # Re-raise the original exception def get_vpc_id_by_name(vpc_name: str) -> Optional[str]: """ Finds a VPC ID by its 'Name' tag. """ ec2_client = boto3.client('ec2') try: response = ec2_client.describe_vpcs( Filters=[ {'Name': 'tag:Name', 'Values': [vpc_name]} ] ) if response and response['Vpcs']: vpc_id = response['Vpcs'][0]['VpcId'] print(f"VPC '{vpc_name}' found with ID: {vpc_id}") # In get_vpc_id_by_name, after finding VPC ID: # Look for NAT Gateways in this VPC ec2_client = boto3.client('ec2') nat_gateways = [] try: response = ec2_client.describe_nat_gateways( Filters=[ {'Name': 'vpc-id', 'Values': [vpc_id]}, # Optional: Add a tag filter if you consistently tag your NATs # {'Name': 'tag:Name', 'Values': [f"{prefix}-nat-gateway"]} ] ) nat_gateways = response.get('NatGateways', []) except Exception as e: print(f"Warning: Could not describe NAT Gateways in VPC '{vpc_id}': {e}") # Decide how to handle this error - proceed or raise? # Decide how to identify the specific NAT Gateway you want to check for. return vpc_id, nat_gateways else: print(f"VPC '{vpc_name}' not found.") return None except Exception as e: print(f"An unexpected error occurred finding VPC '{vpc_name}': {e}") raise # --- Helper to fetch all existing subnets in a VPC once --- def _get_existing_subnets_in_vpc(vpc_id: str) -> Dict[str, Any]: """ Fetches all subnets in a given VPC. Returns a dictionary with 'by_name' (map of name to subnet data), 'by_id' (map of id to subnet data), and 'cidr_networks' (list of ipaddress.IPv4Network). """ ec2_client = boto3.client('ec2') existing_subnets_data = { "by_name": {}, # {subnet_name: {'id': 'subnet-id', 'cidr': 'x.x.x.x/x'}} "by_id": {}, # {subnet_id: {'name': 'subnet-name', 'cidr': 'x.x.x.x/x'}} "cidr_networks": [] # List of ipaddress.IPv4Network objects } try: response = ec2_client.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]) for s in response.get('Subnets', []): subnet_id = s['SubnetId'] cidr_block = s.get('CidrBlock') # Extract 'Name' tag, which is crucial for lookup by name name_tag = next((tag['Value'] for tag in s.get('Tags', []) if tag['Key'] == 'Name'), None) subnet_info = {'id': subnet_id, 'cidr': cidr_block, 'name': name_tag} if name_tag: existing_subnets_data["by_name"][name_tag] = subnet_info existing_subnets_data["by_id"][subnet_id] = subnet_info if cidr_block: try: existing_subnets_data["cidr_networks"].append(ipaddress.ip_network(cidr_block, strict=False)) except ValueError: print(f"Warning: Existing subnet {subnet_id} has an invalid CIDR: {cidr_block}. Skipping for overlap check.") print(f"Fetched {len(response.get('Subnets', []))} existing subnets from VPC '{vpc_id}'.") except Exception as e: print(f"Error describing existing subnets in VPC '{vpc_id}': {e}. Cannot perform full validation.") raise # Re-raise if this essential step fails return existing_subnets_data # --- Modified validate_subnet_creation_parameters to take pre-fetched data --- def validate_subnet_creation_parameters( vpc_id: str, proposed_subnets_data: List[Dict[str, str]], # e.g., [{'name': 'my-public-subnet', 'cidr': '10.0.0.0/24', 'az': 'us-east-1a'}] existing_aws_subnets_data: Dict[str, Any] # Pre-fetched data from _get_existing_subnets_in_vpc ) -> None: """ Validates proposed subnet names and CIDR blocks against existing AWS subnets in the specified VPC and against each other. This function uses pre-fetched AWS subnet data. Args: vpc_id: The ID of the VPC (for logging/error messages). proposed_subnets_data: A list of dictionaries, where each dict represents a proposed subnet with 'name', 'cidr', and 'az'. existing_aws_subnets_data: Dictionary containing existing AWS subnet data (e.g., from _get_existing_subnets_in_vpc). Raises: ValueError: If any proposed subnet name or CIDR block conflicts with existing AWS resources or other proposed resources. """ if not proposed_subnets_data: print("No proposed subnet data provided for validation. Skipping.") return print(f"--- Starting pre-synth validation for VPC '{vpc_id}' with proposed subnets ---") print("Existing subnet data:", pd.DataFrame(existing_aws_subnets_data['by_name'])) existing_aws_subnet_names = set(existing_aws_subnets_data["by_name"].keys()) existing_aws_cidr_networks = existing_aws_subnets_data["cidr_networks"] # Sets to track names and list to track networks for internal batch consistency proposed_names_seen: set[str] = set() proposed_cidr_networks_seen: List[ipaddress.IPv4Network] = [] for i, proposed_subnet in enumerate(proposed_subnets_data): subnet_name = proposed_subnet.get('name') cidr_block_str = proposed_subnet.get('cidr') availability_zone = proposed_subnet.get('az') if not all([subnet_name, cidr_block_str, availability_zone]): raise ValueError(f"Proposed subnet at index {i} is incomplete. Requires 'name', 'cidr', and 'az'.") # 1. Check for duplicate names within the proposed batch if subnet_name in proposed_names_seen: raise ValueError(f"Proposed subnet name '{subnet_name}' is duplicated within the input list.") proposed_names_seen.add(subnet_name) # 2. Check for duplicate names against existing AWS subnets if subnet_name in existing_aws_subnet_names: print(f"Proposed subnet name '{subnet_name}' already exists in VPC '{vpc_id}'.") # Parse proposed CIDR try: proposed_net = ipaddress.ip_network(cidr_block_str, strict=False) except ValueError as e: raise ValueError(f"Invalid CIDR format '{cidr_block_str}' for proposed subnet '{subnet_name}': {e}") # 3. Check for overlapping CIDRs within the proposed batch for existing_proposed_net in proposed_cidr_networks_seen: if proposed_net.overlaps(existing_proposed_net): raise ValueError( f"Proposed CIDR '{cidr_block_str}' for subnet '{subnet_name}' " f"overlaps with another proposed CIDR '{str(existing_proposed_net)}' " f"within the same batch." ) # 4. Check for overlapping CIDRs against existing AWS subnets for existing_aws_net in existing_aws_cidr_networks: if proposed_net.overlaps(existing_aws_net): raise ValueError( f"Proposed CIDR '{cidr_block_str}' for subnet '{subnet_name}' " f"overlaps with an existing AWS subnet CIDR '{str(existing_aws_net)}' " f"in VPC '{vpc_id}'." ) # If all checks pass for this subnet, add its network to the list for subsequent checks proposed_cidr_networks_seen.append(proposed_net) print(f"Validation successful for proposed subnet '{subnet_name}' with CIDR '{cidr_block_str}'.") print(f"--- All proposed subnets passed pre-synth validation checks for VPC '{vpc_id}'. ---") # --- Modified check_subnet_exists_by_name (Uses pre-fetched data) --- def check_subnet_exists_by_name( subnet_name: str, existing_aws_subnets_data: Dict[str, Any] ) -> Tuple[bool, Optional[str]]: """ Checks if a subnet with the given name exists within the pre-fetched data. Args: subnet_name: The 'Name' tag value of the subnet to check. existing_aws_subnets_data: Dictionary containing existing AWS subnet data (e.g., from _get_existing_subnets_in_vpc). Returns: A tuple: - The first element is True if the subnet exists, False otherwise. - The second element is the Subnet ID if found, None otherwise. """ subnet_info = existing_aws_subnets_data["by_name"].get(subnet_name) if subnet_info: print(f"Subnet '{subnet_name}' found with ID: {subnet_info['id']}") return True, subnet_info['id'] else: print(f"Subnet '{subnet_name}' not found.") return False, None def create_nat_gateway( scope: Construct, public_subnet_for_nat: ec2.ISubnet, # Expects a proper ISubnet nat_gateway_name: str, nat_gateway_id_context_key: str ) -> str: """ Creates a single NAT Gateway in the specified public subnet. It does not handle lookup from context; the calling stack should do that. Returns the CloudFormation Ref of the NAT Gateway ID. """ print(f"Defining a new NAT Gateway '{nat_gateway_name}' in subnet '{public_subnet_for_nat.subnet_id}'.") # Create an Elastic IP for the NAT Gateway eip = ec2.CfnEIP(scope, NAT_GATEWAY_EIP_NAME, tags=[CfnTag(key="Name", value=NAT_GATEWAY_EIP_NAME)] ) # Create the NAT Gateway nat_gateway_logical_id = nat_gateway_name.replace('-', '') + "NatGateway" nat_gateway = ec2.CfnNatGateway(scope, nat_gateway_logical_id, subnet_id=public_subnet_for_nat.subnet_id, # Associate with the public subnet allocation_id=eip.attr_allocation_id, # Associate with the EIP tags=[CfnTag(key="Name", value=nat_gateway_name)] ) # The NAT GW depends on the EIP. The dependency on the subnet is implicit via subnet_id. nat_gateway.add_dependency(eip) # *** CRUCIAL: Use CfnOutput to export the ID after deployment *** # This is how you will get the ID to put into cdk.context.json CfnOutput(scope, "SingleNatGatewayIdOutput", value=nat_gateway.ref, description=f"Physical ID of the Single NAT Gateway. Add this to cdk.context.json under the key '{nat_gateway_id_context_key}'.", export_name=f"{scope.stack_name}-NatGatewayId" # Make export name unique ) print(f"CDK: Defined new NAT Gateway '{nat_gateway.ref}'. Its physical ID will be available in the stack outputs after deployment.") # Return the tokenised reference for use within this synthesis return nat_gateway.ref def create_subnets( scope: Construct, vpc: ec2.IVpc, prefix: str, subnet_names: List[str], cidr_blocks: List[str], availability_zones: List[str], is_public: bool, internet_gateway_id: Optional[str] = None, single_nat_gateway_id: Optional[str] = None ) -> Tuple[List[ec2.CfnSubnet], List[ec2.CfnRouteTable]]: """ Creates subnets using L2 constructs but returns the underlying L1 Cfn objects for backward compatibility. """ # --- Validations remain the same --- if not (len(subnet_names) == len(cidr_blocks) == len(availability_zones) > 0): raise ValueError("Subnet names, CIDR blocks, and Availability Zones lists must be non-empty and match in length.") if is_public and not internet_gateway_id: raise ValueError("internet_gateway_id must be provided for public subnets.") if not is_public and not single_nat_gateway_id: raise ValueError("single_nat_gateway_id must be provided for private subnets when using a single NAT Gateway.") # --- We will populate these lists with the L1 objects to return --- created_subnets: List[ec2.CfnSubnet] = [] created_route_tables: List[ec2.CfnRouteTable] = [] subnet_type_tag = "public" if is_public else "private" for i, subnet_name in enumerate(subnet_names): logical_id = f"{prefix}{subnet_type_tag.capitalize()}Subnet{i+1}" # 1. Create the L2 Subnet (this is the easy part) subnet = ec2.Subnet( scope, logical_id, vpc_id=vpc.vpc_id, cidr_block=cidr_blocks[i], availability_zone=availability_zones[i], map_public_ip_on_launch=is_public ) Tags.of(subnet).add("Name", subnet_name) Tags.of(subnet).add("Type", subnet_type_tag) if is_public: # The subnet's route_table is automatically created by the L2 Subnet construct try: subnet.add_route( "DefaultInternetRoute", # A logical ID for the CfnRoute resource router_id=internet_gateway_id, router_type=ec2.RouterType.GATEWAY, # destination_cidr_block="0.0.0.0/0" is the default for this method ) except Exception as e: print("Could not create IGW route for public subnet due to:", e) print(f"CDK: Defined public L2 subnet '{subnet_name}' and added IGW route.") else: try: # Using .add_route() for private subnets as well for consistency subnet.add_route( "DefaultNatRoute", # A logical ID for the CfnRoute resource router_id=single_nat_gateway_id, router_type=ec2.RouterType.NAT_GATEWAY, ) except Exception as e: print("Could not create NAT gateway route for public subnet due to:", e) print(f"CDK: Defined private L2 subnet '{subnet_name}' and added NAT GW route.") route_table = subnet.route_table created_subnets.append(subnet) created_route_tables.append(route_table) return created_subnets, created_route_tables def ingress_rule_exists(security_group:str, peer:str, port:str): for rule in security_group.connections.security_groups: if port: if rule.peer == peer and rule.connection == port: return True else: if rule.peer == peer: return True return False def check_for_existing_user_pool(user_pool_name:str): cognito_client = boto3.client("cognito-idp") list_pools_response = cognito_client.list_user_pools(MaxResults=60) # MaxResults up to 60 # ListUserPools might require pagination if you have more than 60 pools # This simple example doesn't handle pagination, which could miss your pool existing_user_pool_id = "" for pool in list_pools_response.get('UserPools', []): if pool.get('Name') == user_pool_name: existing_user_pool_id = pool['Id'] print(f"Found existing user pool by name '{user_pool_name}' with ID: {existing_user_pool_id}") break # Found the one we're looking for if existing_user_pool_id: return True, existing_user_pool_id, pool else: return False, "", "" def check_for_existing_user_pool_client(user_pool_id: str, user_pool_client_name: str): """ Checks if a Cognito User Pool Client with the given name exists in the specified User Pool. Args: user_pool_id: The ID of the Cognito User Pool. user_pool_client_name: The name of the User Pool Client to check for. Returns: A tuple: - True, client_id, client_details if the client exists. - False, "", {} otherwise. """ cognito_client = boto3.client("cognito-idp") next_token = 'string' while True: try: response = cognito_client.list_user_pool_clients( UserPoolId=user_pool_id, MaxResults=60, NextToken=next_token ) except cognito_client.exceptions.ResourceNotFoundException: print(f"Error: User pool with ID '{user_pool_id}' not found.") return False, "", {} except cognito_client.exceptions.InvalidParameterException: print(f"Error: No app clients for '{user_pool_id}' found.") return False, "", {} except Exception as e: print("Could not check User Pool clients due to:", e) for client in response.get('UserPoolClients', []): if client.get('ClientName') == user_pool_client_name: print(f"Found existing user pool client '{user_pool_client_name}' with ID: {client['ClientId']}") return True, client['ClientId'], client next_token = response.get('NextToken') if not next_token: break return False, "", {} def check_for_secret(secret_name: str, secret_value: dict=""): """ Checks if a Secrets Manager secret with the given name exists. If it doesn't exist, it creates the secret. Args: secret_name: The name of the Secrets Manager secret. secret_value: A dictionary containing the key-value pairs for the secret. Returns: True if the secret existed or was created, False otherwise (due to other errors). """ secretsmanager_client = boto3.client("secretsmanager") try: # Try to get the secret. If it doesn't exist, a ResourceNotFoundException will be raised. secret_value = secretsmanager_client.get_secret_value(SecretId=secret_name) print(f"Secret '{secret_name}' already exists.") return True, secret_value except secretsmanager_client.exceptions.ResourceNotFoundException: print("Secret not found") return False, {} except Exception as e: # Handle other potential exceptions during the get operation print(f"Error checking for secret '{secret_name}': {e}") return False, {} def check_alb_exists(load_balancer_name: str, region_name: str = None) -> tuple[bool, dict]: """ Checks if an Application Load Balancer (ALB) with the given name exists. Args: load_balancer_name: The name of the ALB to check. region_name: The AWS region to check in. If None, uses the default session region. Returns: A tuple: - The first element is True if the ALB exists, False otherwise. - The second element is the ALB object (dictionary) if found, None otherwise. Specifically, it returns the first element of the LoadBalancers list from the describe_load_balancers response. """ if region_name: elbv2_client = boto3.client('elbv2', region_name=region_name) else: elbv2_client = boto3.client('elbv2') try: response = elbv2_client.describe_load_balancers(Names=[load_balancer_name]) if response['LoadBalancers']: return True, response['LoadBalancers'][0] # Return True and the first ALB object else: return False, {} except ClientError as e: # If the error indicates the ALB doesn't exist, return False if e.response['Error']['Code'] == 'LoadBalancerNotFound': return False, {} else: # Re-raise other exceptions raise except Exception as e: print(f"An unexpected error occurred: {e}") return False, {} def check_fargate_task_definition_exists(task_definition_name: str, region_name: str = None) -> tuple[bool, dict]: """ Checks if a Fargate task definition with the given name exists. Args: task_definition_name: The name or ARN of the task definition to check. region_name: The AWS region to check in. If None, uses the default session region. Returns: A tuple: - The first element is True if the task definition exists, False otherwise. - The second element is the task definition object (dictionary) if found, None otherwise. Specifically, it returns the first element of the taskDefinitions list from the describe_task_definition response. """ if region_name: ecs_client = boto3.client('ecs', region_name=region_name) else: ecs_client = boto3.client('ecs') try: response = ecs_client.describe_task_definition(taskDefinition=task_definition_name) # If describe_task_definition succeeds, it returns the task definition. # We can directly return True and the task definition. return True, response['taskDefinition'] except ClientError as e: # Check for the error code indicating the task definition doesn't exist. if e.response['Error']['Code'] == 'ClientException' and 'Task definition' in e.response['Message'] and 'does not exist' in e.response['Message']: return False, {} else: # Re-raise other exceptions. raise except Exception as e: print(f"An unexpected error occurred: {e}") return False, {} def check_ecs_service_exists(cluster_name: str, service_name: str, region_name: str = None) -> tuple[bool, dict]: """ Checks if an ECS service with the given name exists in the specified cluster. Args: cluster_name: The name or ARN of the ECS cluster. service_name: The name of the ECS service to check. region_name: The AWS region to check in. If None, uses the default session region. Returns: A tuple: - The first element is True if the service exists, False otherwise. - The second element is the service object (dictionary) if found, None otherwise. """ if region_name: ecs_client = boto3.client('ecs', region_name=region_name) else: ecs_client = boto3.client('ecs') try: response = ecs_client.describe_services(cluster=cluster_name, services=[service_name]) if response['services']: return True, response['services'][0] # Return True and the first service object else: return False, {} except ClientError as e: # Check for the error code indicating the service doesn't exist. if e.response['Error']['Code'] == 'ClusterNotFoundException': return False, {} elif e.response['Error']['Code'] == 'ServiceNotFoundException': return False, {} else: # Re-raise other exceptions. raise except Exception as e: print(f"An unexpected error occurred: {e}") return False, {} def check_cloudfront_distribution_exists(distribution_name: str, region_name: str = None) -> tuple[bool, dict | None]: """ Checks if a CloudFront distribution with the given name exists. Args: distribution_name: The name of the CloudFront distribution to check. region_name: The AWS region to check in. If None, uses the default session region. Note: CloudFront is a global service, so the region is usually 'us-east-1', but this parameter is included for completeness. Returns: A tuple: - The first element is True if the distribution exists, False otherwise. - The second element is the distribution object (dictionary) if found, None otherwise. Specifically, it returns the first element of the DistributionList from the ListDistributions response. """ if region_name: cf_client = boto3.client('cloudfront', region_name=region_name) else: cf_client = boto3.client('cloudfront') try: response = cf_client.list_distributions() if 'Items' in response['DistributionList']: for distribution in response['DistributionList']['Items']: # CloudFront doesn't directly filter by name, so we have to iterate. if distribution['AliasSet']['Items'] and distribution['AliasSet']['Items'][0] == distribution_name: return True, distribution return False, None else: return False, None except ClientError as e: # If the error indicates the Distribution doesn't exist, return False if e.response['Error']['Code'] == 'NoSuchDistribution': return False, None else: # Re-raise other exceptions raise except Exception as e: print(f"An unexpected error occurred: {e}") return False, None def create_web_acl_with_common_rules(scope:Construct, web_acl_name: str, waf_scope:str="CLOUDFRONT"): ''' Use CDK to create a web ACL based on an AWS common rule set with overrides. This function now expects a 'scope' argument, typically 'self' from your stack, as CfnWebACL requires a construct scope. ''' # Create full list of rules rules = [] aws_ruleset_names = [ "AWSManagedRulesCommonRuleSet", "AWSManagedRulesKnownBadInputsRuleSet", "AWSManagedRulesAmazonIpReputationList" ] # Use a separate counter to assign unique priorities sequentially priority_counter = 1 for aws_rule_name in aws_ruleset_names: current_rule_action_overrides = None # All managed rule groups need an override_action. # 'none' means use the managed rule group's default action. current_override_action = wafv2.CfnWebACL.OverrideActionProperty(none={}) current_priority = priority_counter priority_counter += 1 if aws_rule_name == "AWSManagedRulesCommonRuleSet": current_rule_action_overrides = [ wafv2.CfnWebACL.RuleActionOverrideProperty( name="SizeRestrictions_BODY", action_to_use=wafv2.CfnWebACL.RuleActionProperty( allow={} ) ) ] # No need to set current_override_action here, it's already set above. # If you wanted this specific rule to have a *fixed* priority, you'd handle it differently # For now, it will get priority 1 from the counter. rule_property = wafv2.CfnWebACL.RuleProperty( name=aws_rule_name, priority=current_priority, statement=wafv2.CfnWebACL.StatementProperty( managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty( vendor_name="AWS", name=aws_rule_name, rule_action_overrides=current_rule_action_overrides ) ), visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty( cloud_watch_metrics_enabled=True, metric_name=aws_rule_name, sampled_requests_enabled=True ), override_action=current_override_action # THIS IS THE CRUCIAL PART FOR ALL MANAGED RULES ) rules.append(rule_property) # Add the rate limit rule rate_limit_priority = priority_counter # Use the next available priority rules.append(wafv2.CfnWebACL.RuleProperty( name="RateLimitRule", priority=rate_limit_priority, statement=wafv2.CfnWebACL.StatementProperty( rate_based_statement=wafv2.CfnWebACL.RateBasedStatementProperty( limit=1000, aggregate_key_type="IP" ) ), visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty( cloud_watch_metrics_enabled=True, metric_name="RateLimitRule", sampled_requests_enabled=True ), action=wafv2.CfnWebACL.RuleActionProperty( block={} ) )) web_acl = wafv2.CfnWebACL( scope, "WebACL", name=web_acl_name, default_action=wafv2.CfnWebACL.DefaultActionProperty(allow={}), scope=waf_scope, visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty( cloud_watch_metrics_enabled=True, metric_name="webACL", sampled_requests_enabled=True ), rules=rules ) CfnOutput(scope, "WebACLArn", value=web_acl.attr_arn) return web_acl def check_web_acl_exists(web_acl_name: str, scope: str, region_name: str = None) -> tuple[bool, dict]: """ Checks if a Web ACL with the given name and scope exists. Args: web_acl_name: The name of the Web ACL to check. scope: The scope of the Web ACL ('CLOUDFRONT' or 'REGIONAL'). region_name: The AWS region to check in. Required for REGIONAL scope. If None, uses the default session region. For CLOUDFRONT, the region should be 'us-east-1'. Returns: A tuple: - The first element is True if the Web ACL exists, False otherwise. - The second element is the Web ACL object (dictionary) if found, None otherwise. """ if scope not in ['CLOUDFRONT', 'REGIONAL']: raise ValueError("Scope must be either 'CLOUDFRONT' or 'REGIONAL'") if scope == 'REGIONAL' and not region_name: raise ValueError("Region name is required for REGIONAL scope") if scope == 'CLOUDFRONT': region_name = 'us-east-1' # CloudFront scope requires us-east-1 if region_name: waf_client = boto3.client('wafv2', region_name=region_name) else: waf_client = boto3.client('wafv2') try: response = waf_client.list_web_acls(Scope=scope) if 'WebACLs' in response: for web_acl in response['WebACLs']: if web_acl['Name'] == web_acl_name: # Describe the Web ACL to get the full object. describe_response = waf_client.describe_web_acl(Name=web_acl_name, Scope=scope) return True, describe_response['WebACL'] return False, {} else: return False, {} except ClientError as e: # Check for the error code indicating the web ACL doesn't exist. if e.response['Error']['Code'] == 'ResourceNotFoundException': return False, {} else: # Re-raise other exceptions. raise except Exception as e: print(f"An unexpected error occurred: {e}") return False, {} def add_alb_https_listener_with_cert( scope: Construct, logical_id: str, # A unique ID for this listener construct alb: elb.ApplicationLoadBalancer, acm_certificate_arn: Optional[str], # Optional: If None, no HTTPS listener will be created default_target_group: elb.ITargetGroup, # Mandatory: The target group to forward traffic to listener_port_https: int = 443, listener_open_to_internet: bool = False, # Be cautious with True, ensure ALB security group restricts access # --- Cognito Authentication Parameters --- enable_cognito_auth: bool = False, cognito_user_pool: Optional[cognito.IUserPool] = None, cognito_user_pool_client: Optional[cognito.IUserPoolClient] = None, cognito_user_pool_domain: Optional[str] = None, # E.g., "my-app-domain" for "my-app-domain.auth.region.amazoncognito.com" cognito_auth_scope: Optional[str] = "openid profile email", # Default recommended scope cognito_auth_on_unauthenticated_request: elb.UnauthenticatedAction = elb.UnauthenticatedAction.AUTHENTICATE, stickiness_cookie_duration=None # --- End Cognito Parameters --- ) -> Optional[elb.ApplicationListener]: """ Conditionally adds an HTTPS listener to an ALB with an ACM certificate, and optionally enables Cognito User Pool authentication. Args: scope (Construct): The scope in which to define this construct (e.g., your CDK Stack). logical_id (str): A unique logical ID for the listener construct within the stack. alb (elb.ApplicationLoadBalancer): The Application Load Balancer to add the listener to. acm_certificate_arn (Optional[str]): The ARN of the ACM certificate to attach. If None, the HTTPS listener will NOT be created. default_target_group (elb.ITargetGroup): The default target group for the listener to forward traffic to. This is mandatory for a functional listener. listener_port_https (int): The HTTPS port to listen on (default: 443). listener_open_to_internet (bool): Whether the listener should allow connections from all sources. If False (recommended), ensure your ALB's security group allows inbound traffic on this port from desired sources. enable_cognito_auth (bool): Set to True to enable Cognito User Pool authentication. cognito_user_pool (Optional[cognito.IUserPool]): The Cognito User Pool object. Required if enable_cognito_auth is True. cognito_user_pool_client (Optional[cognito.IUserPoolClient]): The Cognito User Pool App Client object. Required if enable_cognito_auth is True. cognito_user_pool_domain (Optional[str]): The domain prefix for your Cognito User Pool. Required if enable_cognito_auth is True. cognito_auth_scope (Optional[str]): The scope for the Cognito authentication. cognito_auth_on_unauthenticated_request (elb.UnauthenticatedAction): Action for unauthenticated requests. Defaults to AUTHENTICATE (redirect to login). Returns: Optional[elb.ApplicationListener]: The created ApplicationListener if successful, None if no ACM certificate ARN was provided. """ https_listener = None if acm_certificate_arn: certificates_list = [elb.ListenerCertificate.from_arn(acm_certificate_arn)] print(f"Attempting to add ALB HTTPS listener on port {listener_port_https} with ACM certificate: {acm_certificate_arn}") # Determine the default action based on whether Cognito auth is enabled default_action = None if enable_cognito_auth == True: if not all([cognito_user_pool, cognito_user_pool_client, cognito_user_pool_domain]): raise ValueError( "Cognito User Pool, Client, and Domain must be provided if enable_cognito_auth is True." ) print(f"Enabling Cognito authentication with User Pool: {cognito_user_pool.user_pool_id}") default_action = elb_act.AuthenticateCognitoAction( next=elb.ListenerAction.forward([default_target_group]), # After successful auth, forward to TG user_pool=cognito_user_pool, user_pool_client=cognito_user_pool_client, user_pool_domain=cognito_user_pool_domain, scope=cognito_auth_scope, on_unauthenticated_request=cognito_auth_on_unauthenticated_request, session_timeout=stickiness_cookie_duration # Additional options you might want to configure: # session_cookie_name="AWSELBCookies" ) else: default_action = elb.ListenerAction.forward([default_target_group]) print("Cognito authentication is NOT enabled for this listener.") # Add the HTTPS listener https_listener = alb.add_listener( logical_id, port=listener_port_https, open=listener_open_to_internet, certificates=certificates_list, default_action=default_action # Use the determined default action ) print(f"ALB HTTPS listener on port {listener_port_https} defined.") else: print("ACM_CERTIFICATE_ARN is not provided. Skipping HTTPS listener creation.") return https_listener def ensure_folder_exists(output_folder:str): """Checks if the specified folder exists, creates it if not.""" if not os.path.exists(output_folder): # Create the folder if it doesn't exist os.makedirs(output_folder, exist_ok=True) print(f"Created the {output_folder} folder.") else: print(f"The {output_folder} folder already exists.") def create_basic_config_env(out_dir:str="config", S3_LOG_CONFIG_BUCKET_NAME=S3_LOG_CONFIG_BUCKET_NAME, S3_OUTPUT_BUCKET_NAME=S3_OUTPUT_BUCKET_NAME, ACCESS_LOG_DYNAMODB_TABLE_NAME=ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME=FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME=USAGE_LOG_DYNAMODB_TABLE_NAME): ''' Create a basic config.env file for the user to use with their newly deployed redaction app. ''' variables = { 'COGNITO_AUTH':'1', 'RUN_AWS_FUNCTIONS':'1', 'DISPLAY_FILE_NAMES_IN_LOGS':'False', 'SESSION_OUTPUT_FOLDER':'True', 'SAVE_LOGS_TO_DYNAMODB':'True', 'SHOW_COSTS':'True', 'SHOW_WHOLE_DOCUMENT_TEXTRACT_CALL_OPTIONS':'True', 'LOAD_PREVIOUS_TEXTRACT_JOBS_S3':'True', 'DOCUMENT_REDACTION_BUCKET':S3_LOG_CONFIG_BUCKET_NAME, 'TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET':S3_OUTPUT_BUCKET_NAME, 'ACCESS_LOG_DYNAMODB_TABLE_NAME':ACCESS_LOG_DYNAMODB_TABLE_NAME, 'FEEDBACK_LOG_DYNAMODB_TABLE_NAME':FEEDBACK_LOG_DYNAMODB_TABLE_NAME, 'USAGE_LOG_DYNAMODB_TABLE_NAME':USAGE_LOG_DYNAMODB_TABLE_NAME, 'DISPLAY_FILE_NAMES_IN_LOGS':'False' } # Write variables to .env file ensure_folder_exists(out_dir + "/") env_file_path = os.path.abspath(os.path.join(out_dir, 'config.env')) # It's good practice to ensure the file exists before calling set_key repeatedly. # set_key will create it, but for a loop, it might be cleaner to ensure it's empty/exists once. if not os.path.exists(env_file_path): with open(env_file_path, 'w') as f: pass # Create empty file for key, value in variables.items(): set_key(env_file_path, key, str(value), quote_mode="never") return variables def start_codebuild_build(PROJECT_NAME:str, AWS_REGION:str = AWS_REGION): ''' Start an existing Codebuild project build ''' # --- Initialize CodeBuild client --- client = boto3.client('codebuild', region_name=AWS_REGION) try: print(f"Attempting to start build for project: {PROJECT_NAME}") response = client.start_build( projectName=PROJECT_NAME ) build_id = response['build']['id'] print(f"Successfully started build with ID: {build_id}") print(f"Build ARN: {response['build']['arn']}") print(f"Build URL (approximate - construct based on region and ID):") print(f"https://{AWS_REGION}.console.aws.amazon.com/codesuite/codebuild/projects/{PROJECT_NAME}/build/{build_id.split(':')[-1]}/detail") # You can inspect the full response if needed # print("\nFull response:") # import json # print(json.dumps(response, indent=2)) except client.exceptions.ResourceNotFoundException: print(f"Error: Project '{PROJECT_NAME}' not found in region '{AWS_REGION}'.") except Exception as e: print(f"An unexpected error occurred: {e}") def upload_file_to_s3(local_file_paths:List[str], s3_key:str, s3_bucket:str, RUN_AWS_FUNCTIONS:str = "1"): """ Uploads a file from local machine to Amazon S3. Args: - local_file_path: Local file path(s) of the file(s) to upload. - s3_key: Key (path) to the file in the S3 bucket. - s3_bucket: Name of the S3 bucket. Returns: - Message as variable/printed to console """ final_out_message = [] final_out_message_str = "" if RUN_AWS_FUNCTIONS == "1": try: if s3_bucket and local_file_paths: s3_client = boto3.client('s3', region_name=AWS_REGION) if isinstance(local_file_paths, str): local_file_paths = [local_file_paths] for file in local_file_paths: if s3_client: #print(s3_client) try: # Get file name off file path file_name = os.path.basename(file) s3_key_full = s3_key + file_name print("S3 key: ", s3_key_full) s3_client.upload_file(file, s3_bucket, s3_key_full) out_message = "File " + file_name + " uploaded successfully!" print(out_message) except Exception as e: out_message = f"Error uploading file(s): {e}" print(out_message) final_out_message.append(out_message) final_out_message_str = '\n'.join(final_out_message) else: final_out_message_str = "Could not connect to AWS." else: final_out_message_str = "At least one essential variable is empty, could not upload to S3" except Exception as e: final_out_message_str = "Could not upload files to S3 due to: " + str(e) print(final_out_message_str) else: final_out_message_str = "App not set to run AWS functions" return final_out_message_str # Initialize ECS client def start_ecs_task(cluster_name, service_name): ecs_client = boto3.client('ecs') try: # Update the service to set the desired count to 1 response = ecs_client.update_service( cluster=cluster_name, service=service_name, desiredCount=1 ) return { "statusCode": 200, "body": f"Service {service_name} in cluster {cluster_name} has been updated to 1 task." } except Exception as e: return { "statusCode": 500, "body": f"Error updating service: {str(e)}" }