AWS Python


Fetch the Execution Roles & Policies of All AWS Lambda functions in an AWS Account and Export Excel Sheet & JSON to S3

Output CSV


Output Policy.JSON


import boto3 import csv import json import urllib.parse lambda_client = boto3.client('lambda') iam_client = boto3.client('iam') s3_client = boto3.client('s3') BUCKET_NAME = 'ai-kishoreweb-dist-new' CSV_S3_KEY = 'reports/lambda_execution_roles_with_policies.csv' JSON_S3_KEY = 'reports/policy.json' def get_policy_document(policy_arn): # Get current/default version policy = iam_client.get_policy( PolicyArn=policy_arn ) default_version_id = policy['Policy']['DefaultVersionId'] # Get policy version/document version = iam_client.get_policy_version( PolicyArn=policy_arn, VersionId=default_version_id ) return version['PolicyVersion']['Document'] def get_role_details(role_name): role_data = { 'aws_managed_policies': [], 'customer_managed_policies': [], 'inline_policies': [] } # ----------------------------- # Attached Managed Policies # ----------------------------- attached_policies_paginator = iam_client.get_paginator( 'list_attached_role_policies' ) for page in attached_policies_paginator.paginate(RoleName=role_name): for policy in page['AttachedPolicies']: policy_name = policy['PolicyName'] policy_arn = policy['PolicyArn'] # Get Full Policy Document try: policy_document = get_policy_document(policy_arn) except Exception as e: policy_document = { 'error': str(e) } policy_entry = { 'PolicyName': policy_name, 'PolicyArn': policy_arn, 'PolicyDocument': policy_document } # AWS Managed if policy_arn.startswith( 'arn:aws:iam::aws:policy/' ): role_data['aws_managed_policies'].append( policy_entry ) # Customer Managed else: role_data['customer_managed_policies'].append( policy_entry ) # ----------------------------- # Inline Policies # ----------------------------- inline_policies_paginator = iam_client.get_paginator( 'list_role_policies' ) for page in inline_policies_paginator.paginate( RoleName=role_name ): for policy_name in page['PolicyNames']: try: inline_policy = iam_client.get_role_policy( RoleName=role_name, PolicyName=policy_name ) policy_document = inline_policy[ 'PolicyDocument' ] except Exception as e: policy_document = { 'error': str(e) } role_data['inline_policies'].append({ 'PolicyName': policy_name, 'PolicyDocument': policy_document }) return role_data def lambda_handler(event, context): functions = [] # Final JSON Output all_policy_documents = [] # Fetch all Lambdas paginator = lambda_client.get_paginator( 'list_functions' ) for page in paginator.paginate(): for fn in page['Functions']: role_arn = fn.get('Role', '') role_name = role_arn.split('/')[-1] \ if role_arn else '' # Fetch Role Policies role_details = get_role_details(role_name) # CSV Row functions.append({ 'FunctionName': fn.get( 'FunctionName', '' ), 'Runtime': fn.get( 'Runtime', '' ), 'RoleArn': role_arn, 'RoleName': role_name, 'AWSManagedPolicies': ', '.join([ p['PolicyName'] for p in role_details[ 'aws_managed_policies' ] ]), 'CustomerManagedPolicies': ', '.join([ p['PolicyName'] for p in role_details[ 'customer_managed_policies' ] ]), 'InlinePolicies': ', '.join([ p['PolicyName'] for p in role_details[ 'inline_policies' ] ]) }) # JSON Policy Output all_policy_documents.append({ 'FunctionName': fn.get( 'FunctionName', '' ), 'RoleName': role_name, 'RoleArn': role_arn, 'AWSManagedPolicies': role_details[ 'aws_managed_policies' ], 'CustomerManagedPolicies': role_details[ 'customer_managed_policies' ], 'InlinePolicies': role_details[ 'inline_policies' ] }) # ----------------------------- # Create CSV # ----------------------------- csv_file = '/tmp/lambda_roles.csv' with open( csv_file, 'w', newline='', encoding='utf-8' ) as file: fieldnames = [ 'FunctionName', 'Runtime', 'RoleArn', 'RoleName', 'AWSManagedPolicies', 'CustomerManagedPolicies', 'InlinePolicies' ] writer = csv.DictWriter( file, fieldnames=fieldnames ) writer.writeheader() writer.writerows(functions) # ----------------------------- # Create JSON # ----------------------------- json_file = '/tmp/policy.json' with open( json_file, 'w', encoding='utf-8' ) as file: json.dump( all_policy_documents, file, indent=4 ) # ----------------------------- # Upload CSV to S3 # ----------------------------- s3_client.upload_file( csv_file, BUCKET_NAME, CSV_S3_KEY ) # ----------------------------- # Upload JSON to S3 # ----------------------------- s3_client.upload_file( json_file, BUCKET_NAME, JSON_S3_KEY ) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'CSV and Policy JSON uploaded successfully', 'total_functions': len(functions), 'csv_location': f's3://{BUCKET_NAME}/{CSV_S3_KEY}', 'json_location': f's3://{BUCKET_NAME}/{JSON_S3_KEY}' }) }

Fetch Role Policies
try:
    role_details = get_role_details(role_name)

except Exception as e:

    role_details = {
        'aws_managed_policies': [],
        'customer_managed_policies': [],
        'inline_policies': [],
        'error': str(e)
    }
	
	
	
--------------------
all_policy_documents.append({

    'FunctionName': fn.get(
        'FunctionName', ''
    ),

    'RoleName': role_name,

    'RoleArn': role_arn,

    'AWSManagedPolicies':
        role_details.get(
            'aws_managed_policies',
            []
        ),

    'CustomerManagedPolicies':
        role_details.get(
            'customer_managed_policies',
            []
        ),

    'InlinePolicies':
        role_details.get(
            'inline_policies',
            []
        ),

    'Error':
        role_details.get(
            'error',
            ''
        )
})


------------------------------
import boto3
import csv
import json

# AWS Clients
lambda_client = boto3.client('lambda')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')

# =========================
# UPDATE THESE VALUES
# =========================
BUCKET_NAME = 'your-bucket-name'

CSV_S3_KEY = 'reports/lambda_execution_roles_with_policies.csv'
JSON_S3_KEY = 'reports/policy.json'


# =========================
# GET MANAGED POLICY DOCUMENT
# =========================
def get_policy_document(policy_arn):

    policy = iam_client.get_policy(
        PolicyArn=policy_arn
    )

    default_version_id = policy[
        'Policy'
    ][
        'DefaultVersionId'
    ]

    version = iam_client.get_policy_version(
        PolicyArn=policy_arn,
        VersionId=default_version_id
    )

    return version[
        'PolicyVersion'
    ][
        'Document'
    ]


# =========================
# GET ROLE POLICY DETAILS
# =========================
def get_role_details(role_name):

    role_data = {
        'aws_managed_policies': [],
        'customer_managed_policies': [],
        'inline_policies': []
    }

    # -----------------------------------
    # ATTACHED MANAGED POLICIES
    # -----------------------------------
    attached_policies_paginator = iam_client.get_paginator(
        'list_attached_role_policies'
    )

    for page in attached_policies_paginator.paginate(
        RoleName=role_name
    ):

        for policy in page[
            'AttachedPolicies'
        ]:

            policy_name = policy[
                'PolicyName'
            ]

            policy_arn = policy[
                'PolicyArn'
            ]

            # Get Full Policy Document
            try:

                policy_document = get_policy_document(
                    policy_arn
                )

            except Exception as e:

                policy_document = {
                    'error': str(e)
                }

            policy_entry = {
                'PolicyName': policy_name,
                'PolicyArn': policy_arn,
                'PolicyDocument': policy_document
            }

            # AWS Managed Policy
            if policy_arn.startswith(
                'arn:aws:iam::aws:policy/'
            ):

                role_data[
                    'aws_managed_policies'
                ].append(policy_entry)

            # Customer Managed Policy
            else:

                role_data[
                    'customer_managed_policies'
                ].append(policy_entry)

    # -----------------------------------
    # INLINE POLICIES
    # -----------------------------------
    inline_policies_paginator = iam_client.get_paginator(
        'list_role_policies'
    )

    for page in inline_policies_paginator.paginate(
        RoleName=role_name
    ):

        for policy_name in page[
            'PolicyNames'
        ]:

            try:

                inline_policy = iam_client.get_role_policy(
                    RoleName=role_name,
                    PolicyName=policy_name
                )

                policy_document = inline_policy[
                    'PolicyDocument'
                ]

            except Exception as e:

                policy_document = {
                    'error': str(e)
                }

            role_data[
                'inline_policies'
            ].append({

                'PolicyName': policy_name,

                'PolicyDocument': policy_document
            })

    return role_data


# =========================
# MAIN LAMBDA HANDLER
# =========================
def lambda_handler(event, context):

    functions = []

    all_policy_documents = []

    # -----------------------------------
    # FETCH ALL LAMBDAS
    # -----------------------------------
    paginator = lambda_client.get_paginator(
        'list_functions'
    )

    for page in paginator.paginate():

        for fn in page['Functions']:

            function_name = fn.get(
                'FunctionName',
                ''
            )

            runtime = fn.get(
                'Runtime',
                ''
            )

            role_arn = fn.get(
                'Role',
                ''
            )

            role_name = (
                role_arn.split('/')[-1]
                if role_arn else ''
            )

            # -----------------------------------
            # FETCH ROLE DETAILS
            # -----------------------------------
            try:

                role_details = get_role_details(
                    role_name
                )

                error_message = ''

            except Exception as e:

                role_details = {
                    'aws_managed_policies': [],
                    'customer_managed_policies': [],
                    'inline_policies': []
                }

                error_message = str(e)

            # -----------------------------------
            # CSV DATA
            # -----------------------------------
            functions.append({

                'FunctionName': function_name,

                'Runtime': runtime,

                'RoleArn': role_arn,

                'RoleName': role_name,

                'AWSManagedPolicies':
                    ', '.join([
                        p['PolicyName']
                        for p in role_details[
                            'aws_managed_policies'
                        ]
                    ]),

                'CustomerManagedPolicies':
                    ', '.join([
                        p['PolicyName']
                        for p in role_details[
                            'customer_managed_policies'
                        ]
                    ]),

                'InlinePolicies':
                    ', '.join([
                        p['PolicyName']
                        for p in role_details[
                            'inline_policies'
                        ]
                    ]),

                'Error': error_message
            })

            # -----------------------------------
            # JSON POLICY OUTPUT
            # -----------------------------------
            all_policy_documents.append({

                'FunctionName': function_name,

                'Runtime': runtime,

                'RoleArn': role_arn,

                'RoleName': role_name,

                'AWSManagedPolicies':
                    role_details.get(
                        'aws_managed_policies',
                        []
                    ),

                'CustomerManagedPolicies':
                    role_details.get(
                        'customer_managed_policies',
                        []
                    ),

                'InlinePolicies':
                    role_details.get(
                        'inline_policies',
                        []
                    ),

                'Error': error_message
            })

    # -----------------------------------
    # CREATE CSV
    # -----------------------------------
    csv_file = '/tmp/lambda_roles.csv'

    with open(
        csv_file,
        'w',
        newline='',
        encoding='utf-8'
    ) as file:

        fieldnames = [

            'FunctionName',

            'Runtime',

            'RoleArn',

            'RoleName',

            'AWSManagedPolicies',

            'CustomerManagedPolicies',

            'InlinePolicies',

            'Error'
        ]

        writer = csv.DictWriter(
            file,
            fieldnames=fieldnames
        )

        writer.writeheader()

        writer.writerows(functions)

    # -----------------------------------
    # CREATE JSON FILE
    # -----------------------------------
    json_file = '/tmp/policy.json'

    with open(
        json_file,
        'w',
        encoding='utf-8'
    ) as file:

        json.dump(
            all_policy_documents,
            file,
            indent=4
        )

    # -----------------------------------
    # UPLOAD CSV TO S3
    # -----------------------------------
    s3_client.upload_file(
        csv_file,
        BUCKET_NAME,
        CSV_S3_KEY
    )

    # -----------------------------------
    # UPLOAD JSON TO S3
    # -----------------------------------
    s3_client.upload_file(
        json_file,
        BUCKET_NAME,
        JSON_S3_KEY
    )

    # -----------------------------------
    # RESPONSE
    # -----------------------------------
    return {
        'statusCode': 200,
        'body': json.dumps({

            'message':
                'CSV and policy JSON uploaded successfully',

            'total_functions':
                len(functions),

            'csv_location':
                f's3://{BUCKET_NAME}/{CSV_S3_KEY}',

            'json_location':
                f's3://{BUCKET_NAME}/{JSON_S3_KEY}'
        })
    }

----------------------------------------
New-->
import boto3
import csv
import json

# AWS Clients
lambda_client = boto3.client('lambda')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')

# ======================================
# S3 CONFIGURATION
# ======================================
BUCKET_NAME = 'your-bucket-name'

CSV_S3_KEY = 'reports/lambda_execution_roles_with_policies.csv'

JSON_S3_KEY = 'reports/policy.json'


# ======================================
# GET MANAGED POLICY DOCUMENT
# ======================================
def get_policy_document(policy_arn):

    try:

        # Get Policy
        policy = iam_client.get_policy(
            PolicyArn=policy_arn
        )

        # Default Version
        default_version_id = policy[
            'Policy'
        ][
            'DefaultVersionId'
        ]

        # Get Policy Version
        version = iam_client.get_policy_version(
            PolicyArn=policy_arn,
            VersionId=default_version_id
        )

        return version[
            'PolicyVersion'
        ][
            'Document'
        ]

    except Exception as e:

        return {
            'Error': str(e)
        }


# ======================================
# GET ROLE DETAILS
# ======================================
def get_role_details(role_name):

    role_data = {

        'aws_managed_policies': [],

        'customer_managed_policies': [],

        'inline_policies': []
    }

    # ======================================
    # ATTACHED MANAGED POLICIES
    # ======================================
    attached_policies_paginator = iam_client.get_paginator(
        'list_attached_role_policies'
    )

    for page in attached_policies_paginator.paginate(
        RoleName=role_name
    ):

        for policy in page[
            'AttachedPolicies'
        ]:

            policy_name = policy[
                'PolicyName'
            ]

            policy_arn = policy[
                'PolicyArn'
            ]

            # Get Policy Document
            policy_document = get_policy_document(
                policy_arn
            )

            policy_entry = {

                'PolicyName': policy_name,

                'PolicyArn': policy_arn,

                'PolicyDocument': policy_document
            }

            # AWS Managed Policy
            if policy_arn.startswith(
                'arn:aws:iam::aws:policy/'
            ):

                role_data[
                    'aws_managed_policies'
                ].append(policy_entry)

            # Customer Managed Policy
            else:

                role_data[
                    'customer_managed_policies'
                ].append(policy_entry)

    # ======================================
    # INLINE POLICIES
    # ======================================
    inline_policies_paginator = iam_client.get_paginator(
        'list_role_policies'
    )

    for page in inline_policies_paginator.paginate(
        RoleName=role_name
    ):

        for policy_name in page[
            'PolicyNames'
        ]:

            try:

                inline_policy = iam_client.get_role_policy(
                    RoleName=role_name,
                    PolicyName=policy_name
                )

                policy_document = inline_policy[
                    'PolicyDocument'
                ]

            except Exception as e:

                policy_document = {
                    'Error': str(e)
                }

            role_data[
                'inline_policies'
            ].append({

                'PolicyName': policy_name,

                'PolicyDocument': policy_document
            })

    return role_data


# ======================================
# MAIN LAMBDA FUNCTION
# ======================================
def lambda_handler(event, context):

    functions = []

    all_policy_documents = []

    # ======================================
    # FETCH ALL LAMBDAS
    # ======================================
    paginator = lambda_client.get_paginator(
        'list_functions'
    )

    for page in paginator.paginate():

        for fn in page['Functions']:

            function_name = fn.get(
                'FunctionName',
                ''
            )

            runtime = fn.get(
                'Runtime',
                ''
            )

            role_arn = fn.get(
                'Role',
                ''
            )

            role_name = (
                role_arn.split('/')[-1]
                if role_arn else ''
            )

            # ======================================
            # HANDLE INVALID / DELETED ROLES
            # ======================================
            try:

                role_details = get_role_details(
                    role_name
                )

                error_message = ''

            except Exception as e:

                role_details = {

                    'aws_managed_policies': [],

                    'customer_managed_policies': [],

                    'inline_policies': []
                }

                error_message = str(e)

            # ======================================
            # CSV OUTPUT
            # ======================================
            functions.append({

                'FunctionName': function_name,

                'Runtime': runtime,

                'RoleArn': role_arn,

                'RoleName': role_name,

                'AWSManagedPolicies':
                    ', '.join([
                        p['PolicyName']
                        for p in role_details[
                            'aws_managed_policies'
                        ]
                    ]),

                'CustomerManagedPolicies':
                    ', '.join([
                        p['PolicyName']
                        for p in role_details[
                            'customer_managed_policies'
                        ]
                    ]),

                'InlinePolicies':
                    ', '.join([
                        p['PolicyName']
                        for p in role_details[
                            'inline_policies'
                        ]
                    ]),

                'Error': error_message
            })

            # ======================================
            # JSON OUTPUT
            # ======================================
            all_policy_documents.append({

                'FunctionName': function_name,

                'Runtime': runtime,

                'RoleArn': role_arn,

                'RoleName': role_name,

                'AWSManagedPolicies':
                    role_details.get(
                        'aws_managed_policies',
                        []
                    ),

                'CustomerManagedPolicies':
                    role_details.get(
                        'customer_managed_policies',
                        []
                    ),

                'InlinePolicies':
                    role_details.get(
                        'inline_policies',
                        []
                    ),

                'Error': error_message
            })

    # ======================================
    # CREATE CSV
    # ======================================
    csv_file = '/tmp/lambda_roles.csv'

    with open(
        csv_file,
        'w',
        newline='',
        encoding='utf-8'
    ) as file:

        fieldnames = [

            'FunctionName',

            'Runtime',

            'RoleArn',

            'RoleName',

            'AWSManagedPolicies',

            'CustomerManagedPolicies',

            'InlinePolicies',

            'Error'
        ]

        writer = csv.DictWriter(
            file,
            fieldnames=fieldnames
        )

        writer.writeheader()

        writer.writerows(functions)

    # ======================================
    # CREATE JSON FILE
    # ======================================
    json_file = '/tmp/policy.json'

    with open(
        json_file,
        'w',
        encoding='utf-8'
    ) as file:

        json.dump(
            all_policy_documents,
            file,
            indent=4
        )

    # ======================================
    # UPLOAD CSV TO S3
    # ======================================
    s3_client.upload_file(
        csv_file,
        BUCKET_NAME,
        CSV_S3_KEY
    )

    # ======================================
    # UPLOAD JSON TO S3
    # ======================================
    s3_client.upload_file(
        json_file,
        BUCKET_NAME,
        JSON_S3_KEY
    )

    # ======================================
    # RETURN RESPONSE
    # ======================================
    return {

        'statusCode': 200,

        'body': json.dumps({

            'message':
                'CSV and policy JSON uploaded successfully',

            'total_functions':
                len(functions),

            'csv_location':
                f's3://{BUCKET_NAME}/{CSV_S3_KEY}',

            'json_location':
                f's3://{BUCKET_NAME}/{JSON_S3_KEY}'
        })
    }