Trigger CodePipeline & Send Slack Notifications at Each Stage
Python script to start an AWS CodePipeline execution, poll stage progress, and post real-time Slack notifications as each stage succeeds, fails, or progresses.
CI/CD visibility — developers should get Slack alerts as each pipeline stage completes, not just when the whole pipeline finishes (or fails silently).
Problem Statement
Your team deploys every day. Developers have to keep the CodePipeline console tab open to watch the deployment. When something fails at the “Deploy to Production” stage, they find out 30 minutes later by checking manually. This script sends a Slack message at every stage transition — instant visibility without console tab watching.
CodePipeline Stage Status Values
| Status | Meaning |
|---|---|
InProgress | Stage is currently running |
Succeeded | Stage completed successfully |
Failed | Stage failed — pipeline stops here |
Stopped | Manually stopped |
Skipped | Stage was skipped |
Complete Script
import boto3
import time
import json
import urllib.request
import urllib.error
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
class CodePipelineMonitor:
def __init__(
self,
pipeline_name: str,
slack_webhook_url: str,
region: str = "us-east-1",
):
"""
pipeline_name: Name of the CodePipeline to trigger and monitor.
slack_webhook_url: Slack Incoming Webhook URL from your Slack app.
Format: https://hooks.slack.com/services/T.../B.../xxx
"""
self.cp = boto3.client("codepipeline", region_name=region)
self.pipeline_name = pipeline_name
self.slack_url = slack_webhook_url
self.region = region
# ── Step 1: Trigger the pipeline ─────────────────────────────
def trigger_pipeline(self) -> str:
"""
start_pipeline_execution() queues a new pipeline run.
Returns a pipelineExecutionId (UUID string) that uniquely
identifies this run — use it to query status later.
If a run is already in progress, this starts a new parallel
execution (CodePipeline supports concurrent executions by default).
"""
response = self.cp.start_pipeline_execution(name=self.pipeline_name)
execution_id = response["pipelineExecutionId"]
logger.info(f"Pipeline triggered: {execution_id}")
self.send_slack(
message=(
f"🚀 *Deployment Started*\n"
f"Pipeline: `{self.pipeline_name}`\n"
f"Execution ID: `{execution_id}`\n"
f"Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
),
color="#439FE0", # Blue for "in progress"
)
return execution_id
# ── Step 2: Monitor progress ──────────────────────────────────
def monitor(
self,
execution_id: str,
poll_interval: int = 15,
timeout: int = 1800,
) -> bool:
"""
Polls the pipeline state every poll_interval seconds.
Returns True if the pipeline succeeded, False otherwise.
We track stage_statuses to detect changes — we only send
a Slack notification when a stage's status actually changes,
not on every poll cycle.
timeout=1800 (30 min) covers most pipelines; increase for
pipelines with slow integration tests or large deployments.
"""
stage_statuses: dict[str, str] = {}
start_time = time.time()
logger.info(f"Monitoring execution {execution_id} (timeout: {timeout}s)...")
while time.time() - start_time < timeout:
time.sleep(poll_interval)
try:
# ── Check overall execution status ─────────────────
# get_pipeline_execution() returns the high-level status:
# InProgress / Succeeded / Failed / Stopped / Superseded
exec_response = self.cp.get_pipeline_execution(
pipelineName=self.pipeline_name,
pipelineExecutionId=execution_id,
)
overall_status = exec_response["pipelineExecution"]["status"]
# ── Check individual stage statuses ────────────────
# get_pipeline_state() returns all stages with their
# latestExecution status — more granular than the overall status.
state_response = self.cp.get_pipeline_state(
name=self.pipeline_name
)
for stage in state_response["stageStates"]:
stage_name = stage["stageName"]
latest_exec = stage.get("latestExecution", {})
stage_status = latest_exec.get("status", "NotStarted")
# Notify only on status change (avoids spam)
if stage_statuses.get(stage_name) != stage_status:
stage_statuses[stage_name] = stage_status
self._notify_stage(stage_name, stage_status)
# ── Check for terminal state ───────────────────────
if overall_status in ("Succeeded", "Failed", "Stopped", "Superseded"):
self._notify_final(overall_status, execution_id)
return overall_status == "Succeeded"
except self.cp.exceptions.PipelineExecutionNotFoundException:
logger.error(f"Execution {execution_id} not found")
return False
except Exception as e:
logger.error(f"Error polling pipeline: {e}")
# Timeout reached
self.send_slack(
message=(
f"⏰ *Pipeline Monitoring Timed Out*\n"
f"Pipeline: `{self.pipeline_name}`\n"
f"Execution: `{execution_id}`\n"
f"Waited {timeout // 60} minutes without completion."
),
color="#FFA500", # Orange for timeout
)
return False
# ── Slack notifications ───────────────────────────────────────
def _notify_stage(self, stage_name: str, status: str) -> None:
"""Send a Slack message for each stage status change."""
emoji_map = {
"InProgress": "🔄",
"Succeeded": "✅",
"Failed": "❌",
"Stopped": "⛔",
"Skipped": "⏭️",
}
color_map = {
"InProgress": "#439FE0", # Blue
"Succeeded": "#36a64f", # Green
"Failed": "#D00000", # Red
"Stopped": "#FFA500", # Orange
"Skipped": "#808080", # Grey
}
emoji = emoji_map.get(status, "•")
color = color_map.get(status, "#808080")
self.send_slack(
message=f"{emoji} Stage *{stage_name}*: `{status}`",
color=color,
)
def _notify_final(self, status: str, execution_id: str) -> None:
"""Send final pipeline result notification."""
if status == "Succeeded":
msg = (
f"✅ *Deployment Succeeded!*\n"
f"Pipeline: `{self.pipeline_name}`\n"
f"Execution: `{execution_id}`"
)
color = "#36a64f"
else:
console_url = (
f"https://console.aws.amazon.com/codesuite/codepipeline/pipelines/"
f"{self.pipeline_name}/executions/{execution_id}/timeline"
)
msg = (
f"❌ *Deployment FAILED!*\n"
f"Pipeline: `{self.pipeline_name}`\n"
f"Execution: `{execution_id}`\n"
f"Status: `{status}`\n"
f"<{console_url}|View in Console>"
)
color = "#D00000"
self.send_slack(message=msg, color=color)
def send_slack(self, message: str, color: str = "#439FE0") -> None:
"""
Send a message to Slack via an Incoming Webhook.
We use urllib.request (built-in) instead of the requests library
to keep this script dependency-free — important for Lambda.
Slack Incoming Webhook payload format:
{
"attachments": [{
"color": "#36a64f", ← Left-border color (hex or "good"/"warning"/"danger")
"text": "...", ← Message body (supports Slack mrkdwn)
"footer": "...", ← Small footer text
"ts": 1234567890 ← Unix timestamp for footer display
}]
}
mrkdwn formatting: *bold*, _italic_, `code`, <url|link text>
"""
payload = {
"attachments": [{
"color": color,
"text": message,
"footer": "AWS CodePipeline Monitor",
"ts": int(time.time()),
"mrkdwn_in": ["text"], # Enable mrkdwn parsing in text field
}]
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.slack_url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5) as response:
if response.status != 200:
logger.warning(f"Slack returned non-200: {response.status}")
except urllib.error.URLError as e:
# Log the error but don't raise — Slack failure shouldn't stop monitoring
logger.error(f"Slack notification failed: {e}")
# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
import sys
monitor = CodePipelineMonitor(
pipeline_name="prod-app-pipeline",
slack_webhook_url="https://hooks.slack.com/services/T00/B00/xxx",
region="ap-south-1",
)
# Trigger and monitor
execution_id = monitor.trigger_pipeline()
success = monitor.monitor(
execution_id,
poll_interval=15, # Check every 15 seconds
timeout=1800, # Give up after 30 minutes
)
# Exit code for CI/CD integration
sys.exit(0 if success else 1)
Sample Slack Output
🚀 Deployment Started
Pipeline: `prod-app-pipeline`
Execution ID: `abc-123-def`
Time: 2025-01-20 14:30 UTC
🔄 Stage Source: InProgress
✅ Stage Source: Succeeded
🔄 Stage Build: InProgress
✅ Stage Build: Succeeded
🔄 Stage Deploy: InProgress
✅ Stage Deploy: Succeeded
✅ Deployment Succeeded!
Pipeline: `prod-app-pipeline`
Execution ID: `abc-123-def`
Key Commands Explained
| Command | What it does |
|---|---|
start_pipeline_execution(name=pipeline_name) | Triggers a new pipeline run, returns pipelineExecutionId |
get_pipeline_execution(pipelineName, pipelineExecutionId) | Returns overall status: InProgress/Succeeded/Failed |
get_pipeline_state(name=pipeline_name) | Returns per-stage status with action-level details |
stage["latestExecution"]["status"] | Current status of the most recent execution of this stage |
urllib.request.Request(url, data, headers, method="POST") | HTTP POST request using the standard library |
json.dumps(payload).encode("utf-8") | Serialises dict to JSON bytes for the HTTP body |
urlopen(req, timeout=5) | Sends the request with a 5-second timeout |
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|---|
import boto3 | AWS SDK — needed for CodePipeline client calls |
import time | time.sleep(poll_interval) between status checks. int(time.time()) for Slack message timestamp |
import json | Serialize the Slack payload dict into a JSON string |
import urllib.request | Standard library HTTP client. Used to POST to Slack without the requests library dependency (important for Lambda) |
import urllib.error | Standard library HTTP error handling for failed Slack calls |
import logging | Structured log output |
from datetime import datetime | Formats deployment start time for the Slack message |
CodePipelineMonitor.__init__
self.cp = boto3.client("codepipeline", region_name=region)
self.pipeline_name = pipeline_name
self.slack_url = slack_webhook_url
| Line | Explanation |
|---|---|
boto3.client("codepipeline", region_name=region) | CodePipeline client. Pipelines are regional — you must use the correct region |
self.slack_url | The Slack Incoming Webhook URL (e.g., https://hooks.slack.com/services/T.../B.../xxx). Stored as an instance attribute for use by all notification methods |
trigger_pipeline()
response = self.cp.start_pipeline_execution(name=self.pipeline_name)
execution_id = response["pipelineExecutionId"]
| Line | Explanation |
|---|---|
start_pipeline_execution(name=self.pipeline_name) | Triggers a new execution of the pipeline. If a run is already in progress, this starts a parallel execution |
response["pipelineExecutionId"] | A UUID string uniquely identifying this particular run. Used to query status with get_pipeline_execution() |
monitor(execution_id, poll_interval, timeout)
stage_statuses: dict[str, str] = {}
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(poll_interval)
| Line | Explanation |
|---|---|
stage_statuses: dict[str, str] = {} | Tracks the last-known status for each stage. We only send Slack notifications when a stage’s status changes (avoids sending duplicate messages on every poll) |
time.time() | Returns current Unix timestamp (seconds since epoch) as a float |
time.time() - start_time < timeout | Compares elapsed time against the timeout. Loop exits after timeout seconds regardless of deployment status |
time.sleep(poll_interval) | Pauses before the next poll. 15 seconds is fine — CodePipeline stage transitions don’t happen faster than that |
exec_response = self.cp.get_pipeline_execution(
pipelineName=self.pipeline_name,
pipelineExecutionId=execution_id,
)
overall_status = exec_response["pipelineExecution"]["status"]
| Line | Explanation |
|---|---|
get_pipeline_execution(pipelineName, pipelineExecutionId) | Returns the high-level status of this specific execution: InProgress, Succeeded, Failed, Stopped, Superseded |
["pipelineExecution"]["status"] | The status string. Used to detect terminal states and stop the polling loop |
state_response = self.cp.get_pipeline_state(name=self.pipeline_name)
for stage in state_response["stageStates"]:
stage_name = stage["stageName"]
stage_status = stage.get("latestExecution", {}).get("status", "NotStarted")
if stage_statuses.get(stage_name) != stage_status:
stage_statuses[stage_name] = stage_status
self._notify_stage(stage_name, stage_status)
| Line | Explanation |
|---|---|
get_pipeline_state(name=self.pipeline_name) | Returns per-stage status. More granular than get_pipeline_execution() — you can see which specific stage is InProgress |
state_response["stageStates"] | List of stage dicts, one per pipeline stage (Source, Build, Deploy, etc.) |
stage["stageName"] | The name you gave the stage when creating the pipeline |
stage.get("latestExecution", {}).get("status", "NotStarted") | Chained .get() — latestExecution is absent if the stage hasn’t run yet. "NotStarted" is our placeholder |
if stage_statuses.get(stage_name) != stage_status | Only notify when status changes. Without this check, every poll would send a Slack message |
send_slack(message, color)
payload = {
"attachments": [{
"color": color,
"text": message,
"footer": "AWS CodePipeline Monitor",
"ts": int(time.time()),
"mrkdwn_in": ["text"],
}]
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.slack_url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=5) as response:
...
| Line | Explanation |
|---|---|
"attachments": [...] | Slack’s legacy attachment format. The left-colored sidebar is only available via attachments (not blocks API) |
"color": color | The colored left border: "#36a64f" = green (success), "#D00000" = red (failure), "#439FE0" = blue (info) |
"ts": int(time.time()) | Unix timestamp shown as a human-readable time in the Slack message footer |
"mrkdwn_in": ["text"] | Tells Slack to render Markdown (*bold*, `code`) in the text field |
json.dumps(payload).encode("utf-8") | Converts dict → JSON string → bytes. encode("utf-8") is required because HTTP requires bytes, not strings |
urllib.request.Request(url, data, headers, method="POST") | Creates an HTTP request object. data must be bytes, headers is a dict, method="POST" overrides the default GET |
urllib.request.urlopen(req, timeout=5) | Sends the request. timeout=5 means: give up if Slack doesn’t respond in 5 seconds |
except urllib.error.URLError: logger.error(...) | Catches network errors (DNS failure, timeout). We log but don’t raise — Slack failure shouldn’t break the monitoring loop |
Exit Code for CI/CD
success = monitor.monitor(execution_id, poll_interval=15, timeout=1800)
sys.exit(0 if success else 1)
| Line | Explanation |
|---|---|
monitor.monitor(...) | Returns True if overall_status == "Succeeded", False otherwise |
sys.exit(0 if success else 1) | CI/CD systems (GitHub Actions, Jenkins) check the process exit code. 0 = the deployment step passed. 1 = the step failed, stop the pipeline |
- CodePipeline execution lifecycle
- Stage status polling
- Slack Incoming Webhook payload format
- urllib.request for HTTP calls without extra dependencies
Have a similar scenario to share?
Production incidents are the best teachers. Submit your real-world scenario and help others learn.
Open Google FormRelated Scenarios
Auto Stop/Start EC2 Instances Using Schedule Tags with Python
Problem Statement Your team has 20 dev/staging EC2 instances that run 24/7 but are only used during business hours (8 AM – 8 PM). Each …
Build a Zero-Downtime Deployment Pipeline for Microservices on EKS
The Problem A traditional kubectl apply replaces all pods simultaneously — if the new image is broken, users hit errors until you notice and …
Clean Up Unused AWS Resources — EBS Volumes, EIPs, Old AMIs with Cost Report
Resource Cost Overview Resource Approx. Cost When it wastes money EBS gp3 volume $0.08/GB/month When not attached to any instance Elastic IP …