Scenario Intermediate Python Python AWS Scripting

Trigger CodePipeline & Send Slack Notifications at Each Stage

Python script to start an AWS CodePipeline execution, poll stage progress, and post real-time Slack notifications as each stage succeeds, fails, or progresses.

January 20, 2025 8 min read ~20 min to complete DB
The Situation

CI/CD visibility — developers should get Slack alerts as each pipeline stage completes, not just when the whole pipeline finishes (or fails silently).

6 Steps
3 Services Used
~20 min Duration
Intermediate Difficulty

Problem Statement

Your team deploys every day. Developers have to keep the CodePipeline console tab open to watch the deployment. When something fails at the “Deploy to Production” stage, they find out 30 minutes later by checking manually. This script sends a Slack message at every stage transition — instant visibility without console tab watching.


CodePipeline Stage Status Values

StatusMeaning
InProgressStage is currently running
SucceededStage completed successfully
FailedStage failed — pipeline stops here
StoppedManually stopped
SkippedStage was skipped

Complete Script

import boto3
import time
import json
import urllib.request
import urllib.error
import logging
from datetime import datetime

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")


class CodePipelineMonitor:
    def __init__(
        self,
        pipeline_name: str,
        slack_webhook_url: str,
        region: str = "us-east-1",
    ):
        """
        pipeline_name:     Name of the CodePipeline to trigger and monitor.
        slack_webhook_url: Slack Incoming Webhook URL from your Slack app.
                           Format: https://hooks.slack.com/services/T.../B.../xxx
        """
        self.cp            = boto3.client("codepipeline", region_name=region)
        self.pipeline_name = pipeline_name
        self.slack_url     = slack_webhook_url
        self.region        = region

    # ── Step 1: Trigger the pipeline ─────────────────────────────
    def trigger_pipeline(self) -> str:
        """
        start_pipeline_execution() queues a new pipeline run.
        Returns a pipelineExecutionId (UUID string) that uniquely
        identifies this run — use it to query status later.

        If a run is already in progress, this starts a new parallel
        execution (CodePipeline supports concurrent executions by default).
        """
        response = self.cp.start_pipeline_execution(name=self.pipeline_name)
        execution_id = response["pipelineExecutionId"]

        logger.info(f"Pipeline triggered: {execution_id}")
        self.send_slack(
            message=(
                f"🚀 *Deployment Started*\n"
                f"Pipeline: `{self.pipeline_name}`\n"
                f"Execution ID: `{execution_id}`\n"
                f"Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
            ),
            color="#439FE0",   # Blue for "in progress"
        )
        return execution_id

    # ── Step 2: Monitor progress ──────────────────────────────────
    def monitor(
        self,
        execution_id: str,
        poll_interval: int = 15,
        timeout: int       = 1800,
    ) -> bool:
        """
        Polls the pipeline state every poll_interval seconds.
        Returns True if the pipeline succeeded, False otherwise.

        We track stage_statuses to detect changes — we only send
        a Slack notification when a stage's status actually changes,
        not on every poll cycle.

        timeout=1800 (30 min) covers most pipelines; increase for
        pipelines with slow integration tests or large deployments.
        """
        stage_statuses: dict[str, str] = {}
        start_time = time.time()

        logger.info(f"Monitoring execution {execution_id} (timeout: {timeout}s)...")

        while time.time() - start_time < timeout:
            time.sleep(poll_interval)

            try:
                # ── Check overall execution status ─────────────────
                # get_pipeline_execution() returns the high-level status:
                # InProgress / Succeeded / Failed / Stopped / Superseded
                exec_response = self.cp.get_pipeline_execution(
                    pipelineName=self.pipeline_name,
                    pipelineExecutionId=execution_id,
                )
                overall_status = exec_response["pipelineExecution"]["status"]

                # ── Check individual stage statuses ────────────────
                # get_pipeline_state() returns all stages with their
                # latestExecution status — more granular than the overall status.
                state_response = self.cp.get_pipeline_state(
                    name=self.pipeline_name
                )

                for stage in state_response["stageStates"]:
                    stage_name   = stage["stageName"]
                    latest_exec  = stage.get("latestExecution", {})
                    stage_status = latest_exec.get("status", "NotStarted")

                    # Notify only on status change (avoids spam)
                    if stage_statuses.get(stage_name) != stage_status:
                        stage_statuses[stage_name] = stage_status
                        self._notify_stage(stage_name, stage_status)

                # ── Check for terminal state ───────────────────────
                if overall_status in ("Succeeded", "Failed", "Stopped", "Superseded"):
                    self._notify_final(overall_status, execution_id)
                    return overall_status == "Succeeded"

            except self.cp.exceptions.PipelineExecutionNotFoundException:
                logger.error(f"Execution {execution_id} not found")
                return False
            except Exception as e:
                logger.error(f"Error polling pipeline: {e}")

        # Timeout reached
        self.send_slack(
            message=(
                f"⏰ *Pipeline Monitoring Timed Out*\n"
                f"Pipeline: `{self.pipeline_name}`\n"
                f"Execution: `{execution_id}`\n"
                f"Waited {timeout // 60} minutes without completion."
            ),
            color="#FFA500",  # Orange for timeout
        )
        return False

    # ── Slack notifications ───────────────────────────────────────
    def _notify_stage(self, stage_name: str, status: str) -> None:
        """Send a Slack message for each stage status change."""
        emoji_map = {
            "InProgress": "🔄",
            "Succeeded":  "✅",
            "Failed":     "❌",
            "Stopped":    "⛔",
            "Skipped":    "⏭️",
        }
        color_map = {
            "InProgress": "#439FE0",   # Blue
            "Succeeded":  "#36a64f",   # Green
            "Failed":     "#D00000",   # Red
            "Stopped":    "#FFA500",   # Orange
            "Skipped":    "#808080",   # Grey
        }
        emoji = emoji_map.get(status, "•")
        color = color_map.get(status, "#808080")

        self.send_slack(
            message=f"{emoji} Stage *{stage_name}*: `{status}`",
            color=color,
        )

    def _notify_final(self, status: str, execution_id: str) -> None:
        """Send final pipeline result notification."""
        if status == "Succeeded":
            msg   = (
                f"✅ *Deployment Succeeded!*\n"
                f"Pipeline: `{self.pipeline_name}`\n"
                f"Execution: `{execution_id}`"
            )
            color = "#36a64f"
        else:
            console_url = (
                f"https://console.aws.amazon.com/codesuite/codepipeline/pipelines/"
                f"{self.pipeline_name}/executions/{execution_id}/timeline"
            )
            msg = (
                f"❌ *Deployment FAILED!*\n"
                f"Pipeline: `{self.pipeline_name}`\n"
                f"Execution: `{execution_id}`\n"
                f"Status: `{status}`\n"
                f"<{console_url}|View in Console>"
            )
            color = "#D00000"

        self.send_slack(message=msg, color=color)

    def send_slack(self, message: str, color: str = "#439FE0") -> None:
        """
        Send a message to Slack via an Incoming Webhook.

        We use urllib.request (built-in) instead of the requests library
        to keep this script dependency-free — important for Lambda.

        Slack Incoming Webhook payload format:
        {
          "attachments": [{
            "color": "#36a64f",    ← Left-border color (hex or "good"/"warning"/"danger")
            "text": "...",         ← Message body (supports Slack mrkdwn)
            "footer": "...",       ← Small footer text
            "ts": 1234567890       ← Unix timestamp for footer display
          }]
        }

        mrkdwn formatting: *bold*, _italic_, `code`, <url|link text>
        """
        payload = {
            "attachments": [{
                "color":   color,
                "text":    message,
                "footer":  "AWS CodePipeline Monitor",
                "ts":      int(time.time()),
                "mrkdwn_in": ["text"],   # Enable mrkdwn parsing in text field
            }]
        }

        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            self.slack_url,
            data=data,
            headers={"Content-Type": "application/json"},
            method="POST",
        )

        try:
            with urllib.request.urlopen(req, timeout=5) as response:
                if response.status != 200:
                    logger.warning(f"Slack returned non-200: {response.status}")
        except urllib.error.URLError as e:
            # Log the error but don't raise — Slack failure shouldn't stop monitoring
            logger.error(f"Slack notification failed: {e}")


# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
    import sys

    monitor = CodePipelineMonitor(
        pipeline_name="prod-app-pipeline",
        slack_webhook_url="https://hooks.slack.com/services/T00/B00/xxx",
        region="ap-south-1",
    )

    # Trigger and monitor
    execution_id = monitor.trigger_pipeline()
    success = monitor.monitor(
        execution_id,
        poll_interval=15,   # Check every 15 seconds
        timeout=1800,       # Give up after 30 minutes
    )

    # Exit code for CI/CD integration
    sys.exit(0 if success else 1)

Sample Slack Output

🚀 Deployment Started
Pipeline: `prod-app-pipeline`
Execution ID: `abc-123-def`
Time: 2025-01-20 14:30 UTC

🔄 Stage Source: InProgress
✅ Stage Source: Succeeded

🔄 Stage Build: InProgress
✅ Stage Build: Succeeded

🔄 Stage Deploy: InProgress
✅ Stage Deploy: Succeeded

✅ Deployment Succeeded!
Pipeline: `prod-app-pipeline`
Execution ID: `abc-123-def`

Key Commands Explained

CommandWhat it does
start_pipeline_execution(name=pipeline_name)Triggers a new pipeline run, returns pipelineExecutionId
get_pipeline_execution(pipelineName, pipelineExecutionId)Returns overall status: InProgress/Succeeded/Failed
get_pipeline_state(name=pipeline_name)Returns per-stage status with action-level details
stage["latestExecution"]["status"]Current status of the most recent execution of this stage
urllib.request.Request(url, data, headers, method="POST")HTTP POST request using the standard library
json.dumps(payload).encode("utf-8")Serialises dict to JSON bytes for the HTTP body
urlopen(req, timeout=5)Sends the request with a 5-second timeout

🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK — needed for CodePipeline client calls
import timetime.sleep(poll_interval) between status checks. int(time.time()) for Slack message timestamp
import jsonSerialize the Slack payload dict into a JSON string
import urllib.requestStandard library HTTP client. Used to POST to Slack without the requests library dependency (important for Lambda)
import urllib.errorStandard library HTTP error handling for failed Slack calls
import loggingStructured log output
from datetime import datetimeFormats deployment start time for the Slack message

CodePipelineMonitor.__init__

self.cp            = boto3.client("codepipeline", region_name=region)
self.pipeline_name = pipeline_name
self.slack_url     = slack_webhook_url
LineExplanation
boto3.client("codepipeline", region_name=region)CodePipeline client. Pipelines are regional — you must use the correct region
self.slack_urlThe Slack Incoming Webhook URL (e.g., https://hooks.slack.com/services/T.../B.../xxx). Stored as an instance attribute for use by all notification methods

trigger_pipeline()

response = self.cp.start_pipeline_execution(name=self.pipeline_name)
execution_id = response["pipelineExecutionId"]
LineExplanation
start_pipeline_execution(name=self.pipeline_name)Triggers a new execution of the pipeline. If a run is already in progress, this starts a parallel execution
response["pipelineExecutionId"]A UUID string uniquely identifying this particular run. Used to query status with get_pipeline_execution()

monitor(execution_id, poll_interval, timeout)

stage_statuses: dict[str, str] = {}
start_time = time.time()

while time.time() - start_time < timeout:
    time.sleep(poll_interval)
LineExplanation
stage_statuses: dict[str, str] = {}Tracks the last-known status for each stage. We only send Slack notifications when a stage’s status changes (avoids sending duplicate messages on every poll)
time.time()Returns current Unix timestamp (seconds since epoch) as a float
time.time() - start_time < timeoutCompares elapsed time against the timeout. Loop exits after timeout seconds regardless of deployment status
time.sleep(poll_interval)Pauses before the next poll. 15 seconds is fine — CodePipeline stage transitions don’t happen faster than that
exec_response = self.cp.get_pipeline_execution(
    pipelineName=self.pipeline_name,
    pipelineExecutionId=execution_id,
)
overall_status = exec_response["pipelineExecution"]["status"]
LineExplanation
get_pipeline_execution(pipelineName, pipelineExecutionId)Returns the high-level status of this specific execution: InProgress, Succeeded, Failed, Stopped, Superseded
["pipelineExecution"]["status"]The status string. Used to detect terminal states and stop the polling loop
state_response = self.cp.get_pipeline_state(name=self.pipeline_name)
for stage in state_response["stageStates"]:
    stage_name   = stage["stageName"]
    stage_status = stage.get("latestExecution", {}).get("status", "NotStarted")

    if stage_statuses.get(stage_name) != stage_status:
        stage_statuses[stage_name] = stage_status
        self._notify_stage(stage_name, stage_status)
LineExplanation
get_pipeline_state(name=self.pipeline_name)Returns per-stage status. More granular than get_pipeline_execution() — you can see which specific stage is InProgress
state_response["stageStates"]List of stage dicts, one per pipeline stage (Source, Build, Deploy, etc.)
stage["stageName"]The name you gave the stage when creating the pipeline
stage.get("latestExecution", {}).get("status", "NotStarted")Chained .get()latestExecution is absent if the stage hasn’t run yet. "NotStarted" is our placeholder
if stage_statuses.get(stage_name) != stage_statusOnly notify when status changes. Without this check, every poll would send a Slack message

send_slack(message, color)

payload = {
    "attachments": [{
        "color":     color,
        "text":      message,
        "footer":    "AWS CodePipeline Monitor",
        "ts":        int(time.time()),
        "mrkdwn_in": ["text"],
    }]
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
    self.slack_url,
    data=data,
    headers={"Content-Type": "application/json"},
    method="POST",
)
with urllib.request.urlopen(req, timeout=5) as response:
    ...
LineExplanation
"attachments": [...]Slack’s legacy attachment format. The left-colored sidebar is only available via attachments (not blocks API)
"color": colorThe colored left border: "#36a64f" = green (success), "#D00000" = red (failure), "#439FE0" = blue (info)
"ts": int(time.time())Unix timestamp shown as a human-readable time in the Slack message footer
"mrkdwn_in": ["text"]Tells Slack to render Markdown (*bold*, `code`) in the text field
json.dumps(payload).encode("utf-8")Converts dict → JSON string → bytes. encode("utf-8") is required because HTTP requires bytes, not strings
urllib.request.Request(url, data, headers, method="POST")Creates an HTTP request object. data must be bytes, headers is a dict, method="POST" overrides the default GET
urllib.request.urlopen(req, timeout=5)Sends the request. timeout=5 means: give up if Slack doesn’t respond in 5 seconds
except urllib.error.URLError: logger.error(...)Catches network errors (DNS failure, timeout). We log but don’t raise — Slack failure shouldn’t break the monitoring loop

Exit Code for CI/CD

success = monitor.monitor(execution_id, poll_interval=15, timeout=1800)
sys.exit(0 if success else 1)
LineExplanation
monitor.monitor(...)Returns True if overall_status == "Succeeded", False otherwise
sys.exit(0 if success else 1)CI/CD systems (GitHub Actions, Jenkins) check the process exit code. 0 = the deployment step passed. 1 = the step failed, stop the pipeline
Services Used
CodePipelineboto3Slack Webhooks
Prerequisites
  • Python 3.8+
  • boto3
  • Slack Incoming Webhook URL
  • IAM: codepipeline:StartPipelineExecution, codepipeline:GetPipelineExecution, codepipeline:GetPipelineState
What You Learned
  • CodePipeline execution lifecycle
  • Stage status polling
  • Slack Incoming Webhook payload format
  • urllib.request for HTTP calls without extra dependencies

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios