AWS Python
Fetch the Execution Roles & Policies of All AWS Lambda functions in an AWS Account and Export Excel Sheet & JSON to S3
Fetch Role PoliciesOutput CSV
Output Policy.JSON
import boto3 import csv import json import urllib.parse lambda_client = boto3.client('lambda') iam_client = boto3.client('iam') s3_client = boto3.client('s3') BUCKET_NAME ='ai-kishoreweb-dist-new' CSV_S3_KEY ='reports/lambda_execution_roles_with_policies.csv' JSON_S3_KEY ='reports/policy.json' def get_policy_document(policy_arn): # Get current/default version policy = iam_client.get_policy( PolicyArn=policy_arn ) default_version_id = policy['Policy']['DefaultVersionId'] # Get policy version/document version = iam_client.get_policy_version( PolicyArn=policy_arn, VersionId=default_version_id ) return version['PolicyVersion']['Document'] def get_role_details(role_name): role_data = { 'aws_managed_policies': [], 'customer_managed_policies': [], 'inline_policies': [] } # ----------------------------- # Attached Managed Policies # ----------------------------- attached_policies_paginator = iam_client.get_paginator( 'list_attached_role_policies' ) for page in attached_policies_paginator.paginate(RoleName=role_name): for policy in page['AttachedPolicies']: policy_name = policy['PolicyName'] policy_arn = policy['PolicyArn'] # Get Full Policy Document try: policy_document = get_policy_document(policy_arn) except Exception as e: policy_document = { 'error': str(e) } policy_entry = { 'PolicyName': policy_name, 'PolicyArn': policy_arn, 'PolicyDocument': policy_document } # AWS Managed if policy_arn.startswith( 'arn:aws:iam::aws:policy/' ): role_data['aws_managed_policies'].append( policy_entry ) # Customer Managed else: role_data['customer_managed_policies'].append( policy_entry ) # ----------------------------- # Inline Policies # ----------------------------- inline_policies_paginator = iam_client.get_paginator( 'list_role_policies' ) for page in inline_policies_paginator.paginate( RoleName=role_name ): for policy_name in page['PolicyNames']: try: inline_policy = iam_client.get_role_policy( RoleName=role_name, PolicyName=policy_name ) policy_document = inline_policy[ 'PolicyDocument' ] except Exception as e: policy_document = { 'error': str(e) } role_data['inline_policies'].append({ 'PolicyName': policy_name, 'PolicyDocument': policy_document }) return role_data def lambda_handler(event, context): functions = [] # Final JSON Output all_policy_documents = [] # Fetch all Lambdas paginator = lambda_client.get_paginator( 'list_functions' ) for page in paginator.paginate(): for fn in page['Functions']: role_arn = fn.get('Role', '') role_name = role_arn.split('/')[-1] \ if role_arn else '' # Fetch Role Policies role_details = get_role_details(role_name) # CSV Row functions.append({ 'FunctionName': fn.get( 'FunctionName', '' ), 'Runtime': fn.get( 'Runtime', '' ), 'RoleArn': role_arn, 'RoleName': role_name, 'AWSManagedPolicies': ', '.join([ p['PolicyName'] for p in role_details[ 'aws_managed_policies' ] ]), 'CustomerManagedPolicies': ', '.join([ p['PolicyName'] for p in role_details[ 'customer_managed_policies' ] ]), 'InlinePolicies': ', '.join([ p['PolicyName'] for p in role_details[ 'inline_policies' ] ]) }) # JSON Policy Output all_policy_documents.append({ 'FunctionName': fn.get( 'FunctionName', '' ), 'RoleName': role_name, 'RoleArn': role_arn, 'AWSManagedPolicies': role_details[ 'aws_managed_policies' ], 'CustomerManagedPolicies': role_details[ 'customer_managed_policies' ], 'InlinePolicies': role_details[ 'inline_policies' ] }) # ----------------------------- # Create CSV # ----------------------------- csv_file = '/tmp/lambda_roles.csv' with open( csv_file, 'w', newline='', encoding='utf-8' ) as file: fieldnames = [ 'FunctionName', 'Runtime', 'RoleArn', 'RoleName', 'AWSManagedPolicies', 'CustomerManagedPolicies', 'InlinePolicies' ] writer = csv.DictWriter( file, fieldnames=fieldnames ) writer.writeheader() writer.writerows(functions) # ----------------------------- # Create JSON # ----------------------------- json_file = '/tmp/policy.json' with open( json_file, 'w', encoding='utf-8' ) as file: json.dump( all_policy_documents, file, indent=4 ) # ----------------------------- # Upload CSV to S3 # ----------------------------- s3_client.upload_file( csv_file, BUCKET_NAME, CSV_S3_KEY ) # ----------------------------- # Upload JSON to S3 # ----------------------------- s3_client.upload_file( json_file, BUCKET_NAME, JSON_S3_KEY ) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'CSV and Policy JSON uploaded successfully', 'total_functions': len(functions), 'csv_location': f's3://{BUCKET_NAME}/{CSV_S3_KEY}', 'json_location': f's3://{BUCKET_NAME}/{JSON_S3_KEY}' }) }
try:
role_details = get_role_details(role_name)
except Exception as e:
role_details = {
'aws_managed_policies': [],
'customer_managed_policies': [],
'inline_policies': [],
'error': str(e)
}
--------------------
all_policy_documents.append({
'FunctionName': fn.get(
'FunctionName', ''
),
'RoleName': role_name,
'RoleArn': role_arn,
'AWSManagedPolicies':
role_details.get(
'aws_managed_policies',
[]
),
'CustomerManagedPolicies':
role_details.get(
'customer_managed_policies',
[]
),
'InlinePolicies':
role_details.get(
'inline_policies',
[]
),
'Error':
role_details.get(
'error',
''
)
})
------------------------------
import boto3
import csv
import json
# AWS Clients
lambda_client = boto3.client('lambda')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')
# =========================
# UPDATE THESE VALUES
# =========================
BUCKET_NAME = 'your-bucket-name'
CSV_S3_KEY = 'reports/lambda_execution_roles_with_policies.csv'
JSON_S3_KEY = 'reports/policy.json'
# =========================
# GET MANAGED POLICY DOCUMENT
# =========================
def get_policy_document(policy_arn):
policy = iam_client.get_policy(
PolicyArn=policy_arn
)
default_version_id = policy[
'Policy'
][
'DefaultVersionId'
]
version = iam_client.get_policy_version(
PolicyArn=policy_arn,
VersionId=default_version_id
)
return version[
'PolicyVersion'
][
'Document'
]
# =========================
# GET ROLE POLICY DETAILS
# =========================
def get_role_details(role_name):
role_data = {
'aws_managed_policies': [],
'customer_managed_policies': [],
'inline_policies': []
}
# -----------------------------------
# ATTACHED MANAGED POLICIES
# -----------------------------------
attached_policies_paginator = iam_client.get_paginator(
'list_attached_role_policies'
)
for page in attached_policies_paginator.paginate(
RoleName=role_name
):
for policy in page[
'AttachedPolicies'
]:
policy_name = policy[
'PolicyName'
]
policy_arn = policy[
'PolicyArn'
]
# Get Full Policy Document
try:
policy_document = get_policy_document(
policy_arn
)
except Exception as e:
policy_document = {
'error': str(e)
}
policy_entry = {
'PolicyName': policy_name,
'PolicyArn': policy_arn,
'PolicyDocument': policy_document
}
# AWS Managed Policy
if policy_arn.startswith(
'arn:aws:iam::aws:policy/'
):
role_data[
'aws_managed_policies'
].append(policy_entry)
# Customer Managed Policy
else:
role_data[
'customer_managed_policies'
].append(policy_entry)
# -----------------------------------
# INLINE POLICIES
# -----------------------------------
inline_policies_paginator = iam_client.get_paginator(
'list_role_policies'
)
for page in inline_policies_paginator.paginate(
RoleName=role_name
):
for policy_name in page[
'PolicyNames'
]:
try:
inline_policy = iam_client.get_role_policy(
RoleName=role_name,
PolicyName=policy_name
)
policy_document = inline_policy[
'PolicyDocument'
]
except Exception as e:
policy_document = {
'error': str(e)
}
role_data[
'inline_policies'
].append({
'PolicyName': policy_name,
'PolicyDocument': policy_document
})
return role_data
# =========================
# MAIN LAMBDA HANDLER
# =========================
def lambda_handler(event, context):
functions = []
all_policy_documents = []
# -----------------------------------
# FETCH ALL LAMBDAS
# -----------------------------------
paginator = lambda_client.get_paginator(
'list_functions'
)
for page in paginator.paginate():
for fn in page['Functions']:
function_name = fn.get(
'FunctionName',
''
)
runtime = fn.get(
'Runtime',
''
)
role_arn = fn.get(
'Role',
''
)
role_name = (
role_arn.split('/')[-1]
if role_arn else ''
)
# -----------------------------------
# FETCH ROLE DETAILS
# -----------------------------------
try:
role_details = get_role_details(
role_name
)
error_message = ''
except Exception as e:
role_details = {
'aws_managed_policies': [],
'customer_managed_policies': [],
'inline_policies': []
}
error_message = str(e)
# -----------------------------------
# CSV DATA
# -----------------------------------
functions.append({
'FunctionName': function_name,
'Runtime': runtime,
'RoleArn': role_arn,
'RoleName': role_name,
'AWSManagedPolicies':
', '.join([
p['PolicyName']
for p in role_details[
'aws_managed_policies'
]
]),
'CustomerManagedPolicies':
', '.join([
p['PolicyName']
for p in role_details[
'customer_managed_policies'
]
]),
'InlinePolicies':
', '.join([
p['PolicyName']
for p in role_details[
'inline_policies'
]
]),
'Error': error_message
})
# -----------------------------------
# JSON POLICY OUTPUT
# -----------------------------------
all_policy_documents.append({
'FunctionName': function_name,
'Runtime': runtime,
'RoleArn': role_arn,
'RoleName': role_name,
'AWSManagedPolicies':
role_details.get(
'aws_managed_policies',
[]
),
'CustomerManagedPolicies':
role_details.get(
'customer_managed_policies',
[]
),
'InlinePolicies':
role_details.get(
'inline_policies',
[]
),
'Error': error_message
})
# -----------------------------------
# CREATE CSV
# -----------------------------------
csv_file = '/tmp/lambda_roles.csv'
with open(
csv_file,
'w',
newline='',
encoding='utf-8'
) as file:
fieldnames = [
'FunctionName',
'Runtime',
'RoleArn',
'RoleName',
'AWSManagedPolicies',
'CustomerManagedPolicies',
'InlinePolicies',
'Error'
]
writer = csv.DictWriter(
file,
fieldnames=fieldnames
)
writer.writeheader()
writer.writerows(functions)
# -----------------------------------
# CREATE JSON FILE
# -----------------------------------
json_file = '/tmp/policy.json'
with open(
json_file,
'w',
encoding='utf-8'
) as file:
json.dump(
all_policy_documents,
file,
indent=4
)
# -----------------------------------
# UPLOAD CSV TO S3
# -----------------------------------
s3_client.upload_file(
csv_file,
BUCKET_NAME,
CSV_S3_KEY
)
# -----------------------------------
# UPLOAD JSON TO S3
# -----------------------------------
s3_client.upload_file(
json_file,
BUCKET_NAME,
JSON_S3_KEY
)
# -----------------------------------
# RESPONSE
# -----------------------------------
return {
'statusCode': 200,
'body': json.dumps({
'message':
'CSV and policy JSON uploaded successfully',
'total_functions':
len(functions),
'csv_location':
f's3://{BUCKET_NAME}/{CSV_S3_KEY}',
'json_location':
f's3://{BUCKET_NAME}/{JSON_S3_KEY}'
})
}
----------------------------------------
New-->
import boto3
import csv
import json
# AWS Clients
lambda_client = boto3.client('lambda')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')
# ======================================
# S3 CONFIGURATION
# ======================================
BUCKET_NAME = 'your-bucket-name'
CSV_S3_KEY = 'reports/lambda_execution_roles_with_policies.csv'
JSON_S3_KEY = 'reports/policy.json'
# ======================================
# GET MANAGED POLICY DOCUMENT
# ======================================
def get_policy_document(policy_arn):
try:
# Get Policy
policy = iam_client.get_policy(
PolicyArn=policy_arn
)
# Default Version
default_version_id = policy[
'Policy'
][
'DefaultVersionId'
]
# Get Policy Version
version = iam_client.get_policy_version(
PolicyArn=policy_arn,
VersionId=default_version_id
)
return version[
'PolicyVersion'
][
'Document'
]
except Exception as e:
return {
'Error': str(e)
}
# ======================================
# GET ROLE DETAILS
# ======================================
def get_role_details(role_name):
role_data = {
'aws_managed_policies': [],
'customer_managed_policies': [],
'inline_policies': []
}
# ======================================
# ATTACHED MANAGED POLICIES
# ======================================
attached_policies_paginator = iam_client.get_paginator(
'list_attached_role_policies'
)
for page in attached_policies_paginator.paginate(
RoleName=role_name
):
for policy in page[
'AttachedPolicies'
]:
policy_name = policy[
'PolicyName'
]
policy_arn = policy[
'PolicyArn'
]
# Get Policy Document
policy_document = get_policy_document(
policy_arn
)
policy_entry = {
'PolicyName': policy_name,
'PolicyArn': policy_arn,
'PolicyDocument': policy_document
}
# AWS Managed Policy
if policy_arn.startswith(
'arn:aws:iam::aws:policy/'
):
role_data[
'aws_managed_policies'
].append(policy_entry)
# Customer Managed Policy
else:
role_data[
'customer_managed_policies'
].append(policy_entry)
# ======================================
# INLINE POLICIES
# ======================================
inline_policies_paginator = iam_client.get_paginator(
'list_role_policies'
)
for page in inline_policies_paginator.paginate(
RoleName=role_name
):
for policy_name in page[
'PolicyNames'
]:
try:
inline_policy = iam_client.get_role_policy(
RoleName=role_name,
PolicyName=policy_name
)
policy_document = inline_policy[
'PolicyDocument'
]
except Exception as e:
policy_document = {
'Error': str(e)
}
role_data[
'inline_policies'
].append({
'PolicyName': policy_name,
'PolicyDocument': policy_document
})
return role_data
# ======================================
# MAIN LAMBDA FUNCTION
# ======================================
def lambda_handler(event, context):
functions = []
all_policy_documents = []
# ======================================
# FETCH ALL LAMBDAS
# ======================================
paginator = lambda_client.get_paginator(
'list_functions'
)
for page in paginator.paginate():
for fn in page['Functions']:
function_name = fn.get(
'FunctionName',
''
)
runtime = fn.get(
'Runtime',
''
)
role_arn = fn.get(
'Role',
''
)
role_name = (
role_arn.split('/')[-1]
if role_arn else ''
)
# ======================================
# HANDLE INVALID / DELETED ROLES
# ======================================
try:
role_details = get_role_details(
role_name
)
error_message = ''
except Exception as e:
role_details = {
'aws_managed_policies': [],
'customer_managed_policies': [],
'inline_policies': []
}
error_message = str(e)
# ======================================
# CSV OUTPUT
# ======================================
functions.append({
'FunctionName': function_name,
'Runtime': runtime,
'RoleArn': role_arn,
'RoleName': role_name,
'AWSManagedPolicies':
', '.join([
p['PolicyName']
for p in role_details[
'aws_managed_policies'
]
]),
'CustomerManagedPolicies':
', '.join([
p['PolicyName']
for p in role_details[
'customer_managed_policies'
]
]),
'InlinePolicies':
', '.join([
p['PolicyName']
for p in role_details[
'inline_policies'
]
]),
'Error': error_message
})
# ======================================
# JSON OUTPUT
# ======================================
all_policy_documents.append({
'FunctionName': function_name,
'Runtime': runtime,
'RoleArn': role_arn,
'RoleName': role_name,
'AWSManagedPolicies':
role_details.get(
'aws_managed_policies',
[]
),
'CustomerManagedPolicies':
role_details.get(
'customer_managed_policies',
[]
),
'InlinePolicies':
role_details.get(
'inline_policies',
[]
),
'Error': error_message
})
# ======================================
# CREATE CSV
# ======================================
csv_file = '/tmp/lambda_roles.csv'
with open(
csv_file,
'w',
newline='',
encoding='utf-8'
) as file:
fieldnames = [
'FunctionName',
'Runtime',
'RoleArn',
'RoleName',
'AWSManagedPolicies',
'CustomerManagedPolicies',
'InlinePolicies',
'Error'
]
writer = csv.DictWriter(
file,
fieldnames=fieldnames
)
writer.writeheader()
writer.writerows(functions)
# ======================================
# CREATE JSON FILE
# ======================================
json_file = '/tmp/policy.json'
with open(
json_file,
'w',
encoding='utf-8'
) as file:
json.dump(
all_policy_documents,
file,
indent=4
)
# ======================================
# UPLOAD CSV TO S3
# ======================================
s3_client.upload_file(
csv_file,
BUCKET_NAME,
CSV_S3_KEY
)
# ======================================
# UPLOAD JSON TO S3
# ======================================
s3_client.upload_file(
json_file,
BUCKET_NAME,
JSON_S3_KEY
)
# ======================================
# RETURN RESPONSE
# ======================================
return {
'statusCode': 200,
'body': json.dumps({
'message':
'CSV and policy JSON uploaded successfully',
'total_functions':
len(functions),
'csv_location':
f's3://{BUCKET_NAME}/{CSV_S3_KEY}',
'json_location':
f's3://{BUCKET_NAME}/{JSON_S3_KEY}'
})
}

