Architecture


Architecture


IAM Policy of Lambda: ctr-bedrock-orchestrator
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "bedrock:InvokeModel",
      "Resource": "arn:aws:bedrock:us-west-2::foundation-model/anthropic.claude-3-5-sonnet-20241022-v2:0"
    },
    {
      "Effect": "Allow",
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:us-west-2:435182297172:function:ctr-athena-executor"
    }
  ]
}

Environment variables of Lambda: ctr-bedrock-orchestrator
Environment variables:
Key								Value
ATHENA_DATABASE					call_analytics
ATHENA_LAMBDA_NAME				ctr-athena-executor
ATHENA_TABLE_NAME				connect_reporting_glue_table_data
BEDROCK_MODEL_ID				amazon.nova-lite-v1:0
BEDROCK_REGION					us-east-1

Python Lambda: ctr-bedrock-orchestrator
"""
Lambda A — Bedrock Orchestrator (Text-to-SQL)
----------------------------------------------
Step 1 : Receives user's natural language question
Step 2 : Sends question + CTR schema to Bedrock → receives SQL
Step 3 : Invokes Lambda B (Athena Executor) with that SQL
Step 4 : Sends SQL results back to Bedrock → receives human-readable answer
Step 5 : Returns answer + SQL to the chatbot

Runtime : Python 3.12
Handler : lambda_function.lambda_handler
Memory  : 512 MB
Timeout : 120 seconds   (Bedrock + Athena can take ~30-60s total)

Required IAM permissions:
  - bedrock:InvokeModel
  - lambda:InvokeFunction  (for Lambda B)

Environment Variables:
  ATHENA_LAMBDA_NAME  : ctr-athena-executor   (Lambda B function name)
  BEDROCK_MODEL_ID    : anthropic.claude-3-5-sonnet-20241022-v2:0
  BEDROCK_REGION      : us-west-2  (must match your Bedrock model availability)
  ATHENA_TABLE_NAME   : ctr_records            (Glue/Athena table name)
  ATHENA_DATABASE     : connect_ctr_db
"""

import json
import os
import boto3
from botocore.exceptions import ClientError

# --------------------------------------------------------------------------- #
# Config
# --------------------------------------------------------------------------- #
ATHENA_LAMBDA_NAME = os.environ.get("ATHENA_LAMBDA_NAME", "ctr-athena-executor")
BEDROCK_MODEL_ID   = os.environ.get("BEDROCK_MODEL_ID",   "anthropic.claude-3-5-sonnet-20241022-v2:0")
BEDROCK_REGION     = os.environ.get("BEDROCK_REGION",     "us-west-2")
ATHENA_TABLE       = os.environ.get("ATHENA_TABLE_NAME",  "ctr_records")
ATHENA_DATABASE    = os.environ.get("ATHENA_DATABASE",    "connect_ctr_db")

lambda_client  = boto3.client("lambda")
bedrock_client = boto3.client("bedrock-runtime", region_name=BEDROCK_REGION)


# --------------------------------------------------------------------------- #
# CTR Table Schema — used in Bedrock system prompt
# NOTE: Update this if your Glue crawler discovers additional nested columns
# --------------------------------------------------------------------------- #
CTR_SCHEMA = f"""
TABLE: {ATHENA_DATABASE}.{ATHENA_TABLE}

COLUMNS (Athena flattens JSON dot-notation into column names using underscores):

  -- Contact Identifiers
  contactid                         STRING    -- Unique contact ID (UUID)
  initialcontactid                  STRING    -- For transfers/callback chains
  previouscontactid                 STRING
  contactassociationid              STRING

  -- Timing (ISO-8601 UTC strings, use date_parse or substr for filtering)
  initiationtimestamp               STRING    -- Call start time
  disconnecttimestamp               STRING    -- Call end time
  connectedtosystemtimestamp        STRING    -- When connected to IVR/system
  lastupdatetimestamp               STRING

  -- Call Classification
  channel                           STRING    -- 'VOICE', 'CHAT', 'TASK'
  initiationmethod                  STRING    -- 'INBOUND', 'OUTBOUND', 'TRANSFER', 'CALLBACK', 'API'
  disconnectreason                  STRING    -- 'CUSTOMER_DISCONNECT', 'AGENT_DISCONNECT', 'THIRD_PARTY_DISCONNECT', 'TELECOM_PROBLEM', 'BARGED'

  -- Agent Fields (flattened from nested Agent object)
  agent_username                    STRING    -- Agent's login username (e.g. 'kishore')
  agent_agentinteractionduration    INT       -- Seconds agent was talking
  agent_aftercontactworkduration    INT       -- ACW/wrap-up time in seconds
  agent_customerholduration         INT       -- Total customer hold time in seconds
  agent_numberofholds               INT       -- Number of times customer was put on hold
  agent_longestholdduration         INT       -- Longest single hold in seconds
  agent_agentinitiateholdduration   INT       -- Hold initiated by agent
  agent_connectedtoagenttimestamp   STRING    -- When agent answered
  agent_aftercontactworkstarttimestamp STRING
  agent_aftercontactworkendtimestamp   STRING

  -- Routing Profile
  agent_routingprofile_name         STRING    -- e.g. 'Basic Routing Profile'
  agent_routingprofile_arn          STRING

  -- Queue
  queue_name                        STRING    -- e.g. 'BasicQueue'
  queue_duration                    INT       -- Seconds in queue (wait time)
  queue_enqueuetimestamp            STRING    -- When entered queue
  queue_dequeuetimestamp            STRING    -- When left queue (agent answered)

  -- Endpoints
  customerendpoint_address          STRING    -- Customer phone number e.g. '+12024498037'
  customerendpoint_type             STRING    -- 'TELEPHONE_NUMBER'
  systemendpoint_address            STRING    -- Your Connect DID e.g. '+18447761576'

  -- Recording
  recording_status                  STRING    -- 'AVAILABLE', 'DELETED', 'NULL'
  recording_location                STRING    -- S3 path of recording

  -- Quality
  qualitymetrics_agent_audio_qualityscore  DOUBLE  -- 1.0–5.0 scale

  -- Agent Connections
  agentconnectionattempts           INT       -- How many times Connect tried to reach agent

  -- AWS Account / Instance
  awsaccountid                      STRING
  instancearn                       STRING

NOTES FOR QUERY GENERATION:
1. Date filtering: initiationtimestamp is a VARCHAR column. NEVER use current_date, CURRENT_DATE,
   or any date-type functions directly against it. Always cast to string first:
   - Today:     substr(initiationtimestamp, 1, 10) = date_format(current_date, '%Y-%m-%d')
   - Today alt: initiationtimestamp LIKE concat(date_format(current_date, '%Y-%m-%d'), '%')
   - Specific:  initiationtimestamp LIKE '2026-05-22%'
   - Range:     substr(initiationtimestamp, 1, 10) >= '2026-05-22'
                AND substr(initiationtimestamp, 1, 10) < '2026-05-23'
   - This week: substr(initiationtimestamp, 1, 10) >= date_format(date_add('day', -7, current_date), '%Y-%m-%d')
   RULE: current_date returns DATE type — only compare it against other DATE types or cast it
   with date_format(..., '%Y-%m-%d') to get a VARCHAR string for comparison.
2. Duration math: agent_agentinteractionduration + agent_aftercontactworkduration = total handle time
3. Always add LIMIT 100 unless user asks for aggregations (COUNT, AVG, SUM)
4. Agent username is lowercase, use LOWER() if filtering by name
5. Only use SELECT — no INSERT, UPDATE, DELETE, DROP, CREATE
"""


# --------------------------------------------------------------------------- #
# System prompts
# --------------------------------------------------------------------------- #
SQL_GENERATION_PROMPT = f"""You are an expert SQL analyst for Amazon Connect Contact Trace Records (CTR).
Your job is to convert natural language questions into valid Athena SQL queries.

{CTR_SCHEMA}

RULES:
- Return ONLY the SQL query, nothing else — no explanation, no markdown, no backticks
- Use only columns listed in the schema above
- Always include a LIMIT clause (default 100) unless the query is purely aggregation
- Use correct Athena/Presto SQL syntax
- For timestamp comparisons, use string prefix matching or date_parse
- Never use INSERT, UPDATE, DELETE, DROP, ALTER, CREATE, or any DDL/DML
- If the question is unanswerable with this schema, return exactly: CANNOT_ANSWER
- CRITICAL: Return ONLY raw SQL. No markdown, no backticks, no ```sql fences, no explanation.
"""

ANSWER_FORMATTING_PROMPT = """You are a helpful assistant that explains Amazon Connect call center data clearly.
You will receive a user question, the SQL query that was run, and the results.
Provide a clear, concise answer in plain English.
If there are rows of data, summarize the key findings. For numeric results, highlight the important numbers.
If results are empty, say so clearly and suggest why (e.g., no calls matched the criteria).
Keep responses under 200 words unless the data requires more detail.
Do not mention SQL, Athena, or technical details unless explicitly asked.
"""


# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def _invoke_bedrock(system_prompt: str, user_message: str) -> str:
    """Call Bedrock (Nova Lite) and return the text response."""
    body = {
        "messages": [{"role": "user", "content": [{"text": user_message}]}],
        "system": [{"text": system_prompt}],
        "inferenceConfig": {
            "max_new_tokens": 1024,
            "temperature": 0.1,   # Low temp = more deterministic SQL
        },
    }
    response = bedrock_client.invoke_model(
        modelId=BEDROCK_MODEL_ID,
        body=json.dumps(body),
        contentType="application/json",
        accept="application/json",
    )
    result = json.loads(response["body"].read())
    return result["output"]["message"]["content"][0]["text"].strip()
# def _invoke_bedrock(system_prompt: str, user_message: str) -> str:
#     """Call Bedrock (Claude) and return the text response."""
#     body = {
#         "anthropic_version": "bedrock-2023-05-31",
#         "max_tokens": 1024,
#         "system": system_prompt,
#         "messages": [{"role": "user", "content": user_message}],
#     }
#     response = bedrock_client.invoke_model(
#         modelId=BEDROCK_MODEL_ID,
#         body=json.dumps(body),
#         contentType="application/json",
#         accept="application/json",
#     )
#     result = json.loads(response["body"].read())
#     return result["content"][0]["text"].strip()


def _invoke_athena_lambda(sql: str) -> dict:
    """Invoke Lambda B (Athena executor) and return parsed result."""
    payload = json.dumps({"sql": sql})
    response = lambda_client.invoke(
        FunctionName=ATHENA_LAMBDA_NAME,
        InvocationType="RequestResponse",
        Payload=payload.encode(),
    )
    result_payload = json.loads(response["Payload"].read())

    # Lambda B wraps its response in statusCode + body
    if isinstance(result_payload.get("body"), str):
        body = json.loads(result_payload["body"])
    else:
        body = result_payload.get("body", result_payload)

    if result_payload.get("statusCode", 200) != 200:
        raise ValueError(body.get("error", "Athena executor failed"))

    return body


def _summarise_results_for_bedrock(columns: list, rows: list, row_count: int) -> str:
    """
    Build a compact text representation of query results to send to Bedrock.
    Truncates to first 20 rows to keep prompt size reasonable.
    """
    if not rows:
        return "The query returned 0 rows."

    display_rows = rows[:20]
    lines = [", ".join(columns)]
    for row in display_rows:
        lines.append(", ".join(str(row.get(c, "")) for c in columns))

    summary = "\n".join(lines)
    if row_count > 20:
        summary += f"\n... (showing 20 of {row_count} total rows)"
    return summary


# --------------------------------------------------------------------------- #
# Lambda Handler
# --------------------------------------------------------------------------- #
def lambda_handler(event, context):
    print("Input Event", event)
    """
    Expected input — from API Gateway (HTTP API with payload format 2.0):
    {
        "question": "How many calls did agent kishore handle today?"
    }
    Also handles API Gateway proxy integration (body as string).

    Returns:
    {
        "statusCode": 200,
        "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" },
        "body": JSON string with:
            {
                "answer": "Agent kishore handled 3 calls today.",
                "sql":    "SELECT COUNT(*) ...",
                "row_count": 1,
                "columns": ["_col0"],
                "rows":   [{"_col0": "3"}]
            }
    }
    """
    # ---- Parse event (API GW proxy integration) ----------------------------- #
    cors_headers = {
        "Content-Type": "application/json",
        "Access-Control-Allow-Origin": "*",
        "Access-Control-Allow-Headers": "Content-Type",
        "Access-Control-Allow-Methods": "POST,OPTIONS",
    }

    # Handle preflight
    if event.get("requestContext", {}).get("http", {}).get("method") == "OPTIONS" \
       or event.get("httpMethod") == "OPTIONS":
        return {"statusCode": 200, "headers": cors_headers, "body": ""}

    # Parse body
    body_raw = event.get("body", event)
    if isinstance(body_raw, str):
        try:
            body = json.loads(body_raw)
        except json.JSONDecodeError:
            body = {}
    elif isinstance(body_raw, dict):
        body = body_raw
    else:
        body = {}

    question = body.get("question", "").strip()

    if not question:
        return {
            "statusCode": 400,
            "headers": cors_headers,
            "body": json.dumps({"error": "Missing 'question' field in request body"}),
        }

    # ---- Step 1: Generate SQL with Bedrock ---------------------------------- #
    try:
        sql = _invoke_bedrock(
            system_prompt=SQL_GENERATION_PROMPT,
            user_message=f"User question: {question}",
        )
        # Strip markdown code fences that some models wrap SQL in
        import re
        sql = re.sub(r"```(?:sql)?\s*", "", sql).strip().rstrip("```").strip()
    except ClientError as e:
        return {
            "statusCode": 500,
            "headers": cors_headers,
            "body": json.dumps({"error": f"Bedrock SQL generation failed: {str(e)}"}),
        }

    if sql.strip().upper() == "CANNOT_ANSWER":
        return {
            "statusCode": 200,
            "headers": cors_headers,
            "body": json.dumps({
                "answer": "I'm sorry, I can't answer that question with the available call center data. "
                          "I can help with questions about calls, agents, queues, durations, and call outcomes.",
                "sql": None,
                "row_count": 0,
                "columns": [],
                "rows": [],
            }),
        }

    # ---- Step 2: Run SQL via Lambda B (Athena) ------------------------------ #
    try:
        athena_result = _invoke_athena_lambda(sql)
    except Exception as e:
        return {
            "statusCode": 500,
            "headers": cors_headers,
            "body": json.dumps({
                "error": f"Athena query failed: {str(e)}",
                "sql": sql,
            }),
        }

    columns   = athena_result.get("columns", [])
    rows      = athena_result.get("rows", [])
    row_count = athena_result.get("row_count", 0)

    # ---- Step 3: Format answer with Bedrock --------------------------------- #
    results_text = _summarise_results_for_bedrock(columns, rows, row_count)
    format_prompt = (
        f"User question: {question}\n\n"
        f"SQL query used:\n{sql}\n\n"
        f"Query results:\n{results_text}"
    )

    try:
        answer = _invoke_bedrock(
            system_prompt=ANSWER_FORMATTING_PROMPT,
            user_message=format_prompt,
        )
        print("Bedrock Answer", answer)
    except ClientError as e:
        # Fall back to raw summary if formatting fails
        answer = f"Query returned {row_count} record(s)."

    # ---- Return ------------------------------------------------------------- #
    return {
        "statusCode": 200,
        "headers": cors_headers,
        "body": json.dumps({
            "answer": answer,
            "sql": sql,
            "row_count": row_count,
            "columns": columns,
            "rows": rows[:50],   # Cap rows in response payload
        }),
    }

IAM Policy of Lambda: ctr-athena-executor
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "athena:StartQueryExecution",
        "athena:GetQueryExecution",
        "athena:GetQueryResults",
        "glue:GetDatabase",
        "glue:GetTable",
        "glue:GetPartitions"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetObject", "s3:PutObject", "s3:GetBucketLocation"],
      "Resource": [
        "arn:aws:s3:::your-athena-output-bucket/*",
        "arn:aws:s3:::amazon-connect-3f80ca8b9124/ctr/*"
      ]
    }
  ]
}

Environment variables of Lambda: ctr-athena-executor
Environment variables:
Key							Value
ATHENA_DATABASE				call_analytics
ATHENA_OUTPUT_BUCKET		s3://kishore-architectural-files/Unsaved/
ATHENA_WORKGROUP			primary

Python Lambda: ctr-athena-executor
"""
Lambda B — Athena Query Executor
----------------------------------
Receives a SQL string, executes it on Athena,
polls until complete, and returns rows as JSON.

Runtime : Python 3.12
Handler : lambda_function.lambda_handler
Memory  : 256 MB
Timeout : 60 seconds

Required IAM permissions:
  - athena:StartQueryExecution
  - athena:GetQueryExecution
  - athena:GetQueryResults
  - s3:GetObject / s3:PutObject on the Athena output bucket
  - glue:GetTable / glue:GetDatabase (Athena needs Glue catalog access)

Environment Variables:
  ATHENA_DATABASE    : connect_ctr_db
  ATHENA_OUTPUT_BUCKET: s3://your-athena-output-bucket/results/
  ATHENA_WORKGROUP   : primary  (or your custom workgroup)
"""

import json
import time
import os
import boto3
from botocore.exceptions import ClientError

# --------------------------------------------------------------------------- #
# Config from environment variables
# --------------------------------------------------------------------------- #
ATHENA_DATABASE     = os.environ.get("ATHENA_DATABASE", "connect_ctr_db")
ATHENA_OUTPUT       = os.environ.get("ATHENA_OUTPUT_BUCKET", "s3://your-athena-output-bucket/results/")
ATHENA_WORKGROUP    = os.environ.get("ATHENA_WORKGROUP", "primary")
MAX_POLL_ATTEMPTS   = 30          # 30 × 2s = 60s max wait
POLL_INTERVAL_SEC   = 2
MAX_ROWS_RETURNED   = 500         # Safety limit


athena = boto3.client("athena")


# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def _start_query(sql: str) -> str:
    """Submit the query to Athena and return the QueryExecutionId."""
    response = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": ATHENA_DATABASE},
        ResultConfiguration={"OutputLocation": ATHENA_OUTPUT},
        WorkGroup=ATHENA_WORKGROUP,
    )
    print("response", response)
    return response["QueryExecutionId"]


def _wait_for_query(execution_id: str) -> dict:
    """Poll until the query completes or fails. Returns the final status dict."""
    for _ in range(MAX_POLL_ATTEMPTS):
        response = athena.get_query_execution(QueryExecutionId=execution_id)
        status   = response["QueryExecution"]["Status"]
        state    = status["State"]

        if state == "SUCCEEDED":
            return {"state": "SUCCEEDED", "execution_id": execution_id}

        if state in ("FAILED", "CANCELLED"):
            reason = status.get("StateChangeReason", "Unknown error")
            return {"state": state, "error": reason}

        time.sleep(POLL_INTERVAL_SEC)

    return {"state": "TIMEOUT", "error": "Query exceeded maximum wait time"}


def _fetch_results(execution_id: str) -> dict:
    """
    Page through Athena results and return columns + rows.
    Respects MAX_ROWS_RETURNED to keep Lambda response size reasonable.
    """
    columns = []
    rows    = []
    kwargs  = {
        "QueryExecutionId": execution_id,
        "MaxResults": min(MAX_ROWS_RETURNED + 1, 1000),   # first page
    }

    first_page = True
    while True:
        response       = athena.get_query_results(**kwargs)
        result_set     = response["ResultSet"]
        col_metadata   = result_set["ResultSetMetadata"]["ColumnInfo"]

        if first_page:
            columns = [c["Label"] for c in col_metadata]
            first_page = False

        for row in result_set["Rows"]:
            data = [d.get("VarCharValue", "") for d in row["Data"]]
            rows.append(data)

        next_token = response.get("NextToken")
        if not next_token or len(rows) >= MAX_ROWS_RETURNED:
            break
        kwargs["NextToken"] = next_token

    # First row is header (column names) — skip it
    if rows and rows[0] == columns:
        rows = rows[1:]

    # Trim to limit
    rows = rows[:MAX_ROWS_RETURNED]

    # Convert to list-of-dicts for easier consumption
    records = [dict(zip(columns, row)) for row in rows]

    return {
        "columns": columns,
        "rows": records,
        "row_count": len(records),
        "truncated": len(records) == MAX_ROWS_RETURNED,
    }


# --------------------------------------------------------------------------- #
# Lambda Handler
# --------------------------------------------------------------------------- #
def lambda_handler(event, context):
    print("Input Event", event)
    """
    Expected input event:
    {
        "sql": "SELECT ContactId, Agent, Channel FROM ctr_table LIMIT 10"
    }

    Returns:
    {
        "statusCode": 200,
        "body": {
            "columns": ["ContactId", "Agent", "Channel"],
            "rows": [ {"ContactId": "...", "Agent": "...", "Channel": "VOICE"}, ... ],
            "row_count": 10,
            "truncated": false
        }
    }
    """
    sql = event.get("sql", "").strip()

    # ---- Validation -------------------------------------------------------- #
    if not sql:
        return {
            "statusCode": 400,
            "body": json.dumps({"error": "Missing 'sql' in request body"}),
        }

    # Basic SQL injection guard — only SELECT allowed
    sql_upper = sql.upper().lstrip()
    if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"):
        return {
            "statusCode": 400,
            "body": json.dumps({"error": "Only SELECT / WITH queries are allowed"}),
        }

    # ---- Execute ----------------------------------------------------------- #
    try:
        execution_id = _start_query(sql)
        status       = _wait_for_query(execution_id)

        if status["state"] != "SUCCEEDED":
            return {
                "statusCode": 500,
                "body": json.dumps({
                    "error": f"Query {status['state']}: {status.get('error', '')}",
                    "sql": sql,
                }),
            }

        results = _fetch_results(execution_id)

        return {
            "statusCode": 200,
            "body": json.dumps({
                **results,
                "execution_id": execution_id,
                "sql": sql,
            }),
        }

    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_msg  = e.response["Error"]["Message"]
        return {
            "statusCode": 500,
            "body": json.dumps({"error": f"AWS error [{error_code}]: {error_msg}"}),
        }
    except Exception as e:
        return {
            "statusCode": 500,
            "body": json.dumps({"error": str(e)}),
        }

Python Lambda: ctr-athena-executor
CREATE EXTERNAL TABLE connect_reporting_glue_table_data(
  awsaccountid string,
  awscontacttracerecordformatversion string,
  agent struct,username:string>,
  agentconnectionattempts int,
  attributes string,
  channel string,
  connectedtosystemtimestamp string,
  currentagentsnapshot string,
  contactid string,
  customerendpoint struct,
  disconnecttimestamp string,
  initialcontactid string,
  initiationmethod string,
  initiationtimestamp string,
  instancearn string,
  lastupdatetimestamp string,
  mediastreams array>,
  nextcontactid string,
  previouscontactid string,
  queue struct,
  recording struct,
  recordings string,
  systemendpoint struct,
  transfercompletedtimestamp string,
  transferredtoendpoint struct,
  disconnectreason string
)
PARTITIONED BY (
  year string,
  month string,
  day string,
  hour string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://amazon-connect-3f80ca8b9124/ctr/'
TBLPROPERTIES ('has_encrypted_data'='false');

Python Lambda: ctr-athena-executor
SELECT * FROM connect_reporting_glue_table_data

Python Lambda: ctr-athena-executor
CREATE DATABASE IF NOT EXISTS call_analytics
COMMENT 'Database for customer service call analysis'
LOCATION 's3://kishore-architectural-files/athena-results/';

Python Lambda: ctr-athena-executor
DROP TABLE IF EXISTS connect_reporting_glue_table_data;

Python Lambda: ctr-athena-executor
MSCK REPAIR TABLE connect_reporting_glue_table_data;

Python Lambda: ctr-athena-executor
SHOW PARTITIONS connect_reporting_glue_table_data;

Python Lambda: ctr-athena-executor
SELECT contactid, agent.username, channel, disconnectreason
FROM connect_reporting_glue_table_data
LIMIT 10;

Python Lambda: ctr-athena-executor
CREATE EXTERNAL TABLE temp_inspect(
  line string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
LOCATION 's3://amazon-connect-3f80ca8b9124/ctr/2026/05/22/16/';

SELECT line FROM temp_inspect LIMIT 1;
DROP TABLE temp_inspect;

Python Lambda: ctr-athena-executor
CREATE EXTERNAL TABLE connect_reporting_glue_table_data(
  AWSAccountId string,
  AWSContactTraceRecordFormatVersion string,
  Agent struct,
              HierarchyGroups:string,
              LongestHoldDuration:int,
              NumberOfHolds:int,
              RoutingProfile:struct,
              Username:string,
              VoiceEnhancementMode:string>,
  AgentConnectionAttempts int,
  AnsweringMachineDetectionStatus string,
  Attributes string,
  Campaign struct,
  Channel string,
  ConnectedToSystemTimestamp string,
  ContactAssociationId string,
  ContactDetails string,
  ContactId string,
  ContactLens struct,SentimentConfiguration:string,SummaryConfiguration:string>>>,
  CustomerEndpoint struct,
  CustomerVoiceActivity string,
  DisconnectReason string,
  DisconnectTimestamp string,
  InitialContactId string,
  InitiationMethod string,
  InitiationTimestamp string,
  InstanceARN string,
  LastUpdateTimestamp string,
  MediaStreams array>,
  NextContactId string,
  PreviousContactId string,
  QualityMetrics struct,QualityScore:double>>>,
  Queue struct,
  Recording struct,
  Recordings array>,
  References array,
  ScheduledTimestamp string,
  SegmentAttributes string,
  SystemEndpoint struct,
  Tags string,
  TaskTemplateInfo string,
  TransferCompletedTimestamp string,
  TransferredToEndpoint string,
  VoiceIdResult string
)
PARTITIONED BY (
  year string,
  month string,
  day string,
  hour string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://amazon-connect-3f80ca8b9124/ctr/';

Python Lambda: ctr-athena-executor
SELECT 
  ContactId,
  Channel,
  InitiationMethod,
  InitiationTimestamp,
  DisconnectTimestamp,
  DisconnectReason,
  Agent.Username,
  Queue.Name,
  CustomerEndpoint.Address,
  SystemEndpoint.Address
FROM connect_reporting_glue_table_data 
WHERE year = '2026' AND month = '05' AND day = '22'
LIMIT 10;

Python Lambda: ctr-athena-executor