The Situation
Container deployment automation — replace manual ECS deployments with a script that handles versioning, health monitoring, and automatic rollback.
Problem Statement
Your ECS deployment script just updates the service and walks away. When the new container fails to start (bad image, wrong env var, OOM), tasks stay in FAILED state until someone notices the monitoring dashboard 20 minutes later. This script monitors the deployment and rolls back automatically within seconds of detecting failure.
ECS Deployment Flow
Describe current task definition
↓
Register new task definition (with updated image)
↓
update_service() with new task definition + rolling config
↓
Poll service deployments every 30 seconds
↓
├── running == desired → SUCCESS
├── failedTasks > 0 → ROLLBACK to previous task definition
└── timeout → ROLLBACK
Complete Script
import boto3
import time
import sys
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
class ECSRollingDeployer:
def __init__(self, cluster: str, service: str, region: str = "us-east-1"):
"""
cluster: ECS cluster name or ARN (e.g. "prod-cluster")
service: ECS service name (e.g. "payment-service")
"""
self.ecs = boto3.client("ecs", region_name=region)
self.cluster = cluster
self.service = service
# ── Step 1: Get current task definition ──────────────────────
def get_current_task_definition(self) -> str:
"""
describe_services() returns the full service configuration.
services[0]["taskDefinition"] is the ARN of the currently
deployed task definition, e.g.:
arn:aws:ecs:us-east-1:123456789012:task-definition/payment:8
"""
response = self.ecs.describe_services(
cluster=self.cluster,
services=[self.service],
)
if not response["services"]:
raise ValueError(f"Service not found: {self.service} in {self.cluster}")
return response["services"][0]["taskDefinition"]
# ── Step 2: Register new task definition ─────────────────────
def create_new_task_definition(self, current_td_arn: str, new_image: str) -> str:
"""
ECS task definitions are immutable — you can't edit them.
To deploy a new image, you must register a NEW task definition revision
with the updated container image.
describe_task_definition() returns the full spec of an existing revision.
We then strip the read-only fields AWS adds (taskDefinitionArn, revision,
status, etc.) and register a new revision with the updated image.
containerDefinitions[0]["image"] is the Docker image URI.
We update only the first container — adjust the index if your task
has multiple containers and you want to update a specific one.
"""
td = self.ecs.describe_task_definition(
taskDefinition=current_td_arn
)["taskDefinition"]
# Update the image in container 0 (main application container)
containers = td["containerDefinitions"]
old_image = containers[0]["image"]
containers[0]["image"] = new_image
logger.info(f"Image: {old_image} → {new_image}")
# Strip fields that are set by AWS and cannot be provided on register
readonly_fields = [
"taskDefinitionArn", "revision", "status",
"requiresAttributes", "compatibilities",
"registeredAt", "registeredBy",
]
for field in readonly_fields:
td.pop(field, None)
response = self.ecs.register_task_definition(**td)
new_td_arn = response["taskDefinition"]["taskDefinitionArn"]
logger.info(f"Registered new task definition: {new_td_arn}")
return new_td_arn
# ── Step 3: Update service ────────────────────────────────────
def trigger_deployment(self, new_td_arn: str) -> None:
"""
update_service() tells ECS to replace running tasks with new ones
using the specified task definition.
deploymentConfiguration controls the rolling update strategy:
- minimumHealthyPercent=100: Never go below 100% capacity
(ensures zero downtime — requires extra capacity)
- maximumPercent=200: Allow up to 2× the desired count during deploy
(starts new tasks before stopping old ones — blue/green-like)
forceNewDeployment=True is needed if the task definition ARN
hasn't changed (same tag like "latest") — forces ECS to pull
the latest image even if the ARN is the same.
"""
self.ecs.update_service(
cluster=self.cluster,
service=self.service,
taskDefinition=new_td_arn,
deploymentConfiguration={
"minimumHealthyPercent": 100,
"maximumPercent": 200,
},
forceNewDeployment=True,
)
# ── Step 4: Poll deployment health ───────────────────────────
def get_deployment_status(self) -> str:
"""
describe_services() returns a list of deployments for the service.
There is always a PRIMARY deployment (the latest) and optionally
ACTIVE deployments (older tasks being drained).
Primary deployment fields:
- desiredCount: how many tasks ECS wants to run
- runningCount: how many are actually running
- failedTasks: how many task launches have failed
Once runningCount == desiredCount and failedTasks == 0,
the deployment is complete.
"""
response = self.ecs.describe_services(
cluster=self.cluster,
services=[self.service],
)
deployments = response["services"][0]["deployments"]
# Find the PRIMARY (most recent) deployment
primary = next(
(d for d in deployments if d["status"] == "PRIMARY"),
None,
)
if not primary:
return "UNKNOWN"
desired = primary["desiredCount"]
running = primary["runningCount"]
failed = primary["failedTasks"]
if failed > 0:
logger.error(f"Deployment failure: {failed} failed task(s)")
return "FAILED"
if running == desired and desired > 0:
return "HEALTHY"
return f"IN_PROGRESS ({running}/{desired} tasks running)"
# ── Step 5: Roll back ─────────────────────────────────────────
def rollback(self, previous_td_arn: str) -> None:
"""
Rollback by calling update_service() with the PREVIOUS task definition.
ECS will drain the failing new tasks and start tasks from the old definition.
This is the same API call as deployment — ECS handles the swap gracefully.
"""
logger.warning(f"Rolling back to: {previous_td_arn}")
self.ecs.update_service(
cluster=self.cluster,
service=self.service,
taskDefinition=previous_td_arn,
)
logger.info("Rollback triggered. Previous version will restore.")
# ── Orchestrator ──────────────────────────────────────────────
def deploy(
self,
new_image: str,
health_check_retries: int = 15,
health_check_delay: int = 30,
) -> bool:
"""
Full deployment lifecycle:
1. Capture current state for rollback
2. Register new task definition
3. Trigger rolling update
4. Poll health status
5. Rollback if failed or timed out
Returns True if deployment succeeded, False if rolled back.
"""
logger.info(f"Starting deployment of: {new_image}")
# Save the current (stable) task definition for rollback
current_td = self.get_current_task_definition()
logger.info(f"Current task definition (rollback target): {current_td}")
# Create and deploy new task definition
new_td = self.create_new_task_definition(current_td, new_image)
self.trigger_deployment(new_td)
logger.info(f"Deployment triggered. Monitoring health ({health_check_retries} checks × {health_check_delay}s)...")
# Monitor deployment
for attempt in range(1, health_check_retries + 1):
time.sleep(health_check_delay)
status = self.get_deployment_status()
logger.info(f"[{attempt}/{health_check_retries}] Status: {status}")
if status == "HEALTHY":
logger.info(f"✅ Deployment succeeded!")
return True
if status == "FAILED":
logger.error("❌ Deployment failed! Rolling back...")
self.rollback(current_td)
return False
# Timeout — roll back
logger.error("⏰ Health check timed out. Rolling back...")
self.rollback(current_td)
return False
# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
deployer = ECSRollingDeployer(
cluster="prod-cluster",
service="payment-service",
region="ap-south-1",
)
success = deployer.deploy(
new_image="123456789012.dkr.ecr.ap-south-1.amazonaws.com/payment:v2.4.1",
health_check_retries=15, # Wait up to 7.5 min (15 × 30s)
health_check_delay=30,
)
# Exit with non-zero code on failure — triggers CI/CD pipeline failure
sys.exit(0 if success else 1)
Key Commands Explained
| Command | What it does |
|---|
describe_services(cluster, services) | Returns full service state including all deployments |
describe_task_definition(taskDefinition=arn) | Returns the full spec of a task definition revision |
register_task_definition(**td) | Creates a new immutable revision with the updated spec |
update_service(cluster, service, taskDefinition, deploymentConfiguration) | Triggers a rolling update to the new task definition |
minimumHealthyPercent=100 | Never drop below 100% capacity during deployment |
maximumPercent=200 | Allow up to 200% tasks during deployment (old + new) |
forceNewDeployment=True | Forces a re-pull even if the task definition ARN didn’t change |
deployment["failedTasks"] | Number of task launch failures — non-zero means rollback |
deployment["runningCount"] == deployment["desiredCount"] | All desired tasks are healthy |
Common Issues
Deployment stuck in IN_PROGRESS — The new task is failing to start. Check ecs describe-tasks for stopped tasks, then aws logs get-log-events for the container log output.
SERVICE_NOT_ACTIVE error — The ECS service itself is in a bad state (e.g., being deleted). Check the ECS console for service events.
Rolling update causes downtime — If minimumHealthyPercent=50, ECS stops 50% of tasks before starting new ones. Use minimumHealthyPercent=100 + maximumPercent=200 for zero-downtime deployments (requires capacity headroom).
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|
import boto3 | AWS SDK — needed for ECS client calls |
import time | time.sleep(30) — pauses between health status polls |
import sys | sys.exit(0 or 1) — returns appropriate exit code for CI/CD pipelines |
import logging | Structured log output |
ECSRollingDeployer.__init__
self.ecs = boto3.client("ecs", region_name=region)
self.cluster = cluster
self.service = service
| Line | Explanation |
|---|
boto3.client("ecs", region_name=region) | ECS client for the target region. ECS clusters are regional |
self.cluster | The ECS cluster name or ARN. All API calls require this to identify which cluster |
self.service | The ECS service name (e.g., "payment-service"). A cluster can have many services |
get_current_task_definition()
response = self.ecs.describe_services(
cluster=self.cluster,
services=[self.service],
)
return response["services"][0]["taskDefinition"]
| Line | Explanation |
|---|
describe_services(cluster=..., services=[...]) | Returns the full service configuration. services is a list — you can request up to 10 services at once |
response["services"][0] | The first (and only) service dict in the response |
["taskDefinition"] | The ARN of the currently deployed task definition, e.g., arn:aws:ecs:...:task-definition/payment:8. The :8 is the revision number |
if not response["services"]: raise ValueError | Guard for the case where the service doesn’t exist — gives a clear error message |
create_new_task_definition(current_td_arn, new_image)
td = self.ecs.describe_task_definition(
taskDefinition=current_td_arn
)["taskDefinition"]
| Line | Explanation |
|---|
describe_task_definition(taskDefinition=current_td_arn) | Fetches the full spec of an existing revision. This spec is the template for our new revision |
["taskDefinition"] | The task definition dict containing containerDefinitions, cpu, memory, networkMode, executionRoleArn, etc. |
containers = td["containerDefinitions"]
old_image = containers[0]["image"]
containers[0]["image"] = new_image
| Line | Explanation |
|---|
td["containerDefinitions"] | List of container specs — one dict per container in the task |
containers[0] | The first container (main application container). If your task has a sidecar, you’d target a different index |
containers[0]["image"] = new_image | Replaces the Docker image URI. E.g., changes payment:v2.4.0 to payment:v2.4.1 |
readonly_fields = [
"taskDefinitionArn", "revision", "status",
"requiresAttributes", "compatibilities",
"registeredAt", "registeredBy",
]
for field in readonly_fields:
td.pop(field, None)
| Line | Explanation |
|---|
readonly_fields | Fields that AWS populates automatically. If you include them in register_task_definition(), the API raises ClientException: Unknown parameter |
td.pop(field, None) | Removes the field from the dict if it exists. .pop(key, None) returns None instead of raising KeyError if the key is absent |
response = self.ecs.register_task_definition(**td)
new_td_arn = response["taskDefinition"]["taskDefinitionArn"]
| Line | Explanation |
|---|
register_task_definition(**td) | **td unpacks the dict as keyword arguments. This passes all remaining fields (containerDefinitions, cpu, memory, etc.) as named parameters |
response["taskDefinition"]["taskDefinitionArn"] | The ARN of the newly created revision (e.g., ...:task-definition/payment:9) |
trigger_deployment(new_td_arn)
self.ecs.update_service(
cluster=self.cluster,
service=self.service,
taskDefinition=new_td_arn,
deploymentConfiguration={
"minimumHealthyPercent": 100,
"maximumPercent": 200,
},
forceNewDeployment=True,
)
| Parameter | Explanation |
|---|
taskDefinition=new_td_arn | Tells ECS to run tasks using this new task definition revision |
minimumHealthyPercent=100 | ECS must keep at least 100% of the desired count running at all times. New tasks start BEFORE old ones stop (requires enough cluster capacity for 2×) |
maximumPercent=200 | ECS may run up to 200% of the desired count (old + new tasks) during the transition |
forceNewDeployment=True | Required when the task definition ARN is the same (e.g., using the latest tag). Forces ECS to pull the newest image even if the ARN hasn’t changed |
get_deployment_status()
deployments = response["services"][0]["deployments"]
primary = next(
(d for d in deployments if d["status"] == "PRIMARY"),
None,
)
| Line | Explanation |
|---|
response["services"][0]["deployments"] | List of deployment objects. During a rolling update, there are multiple: PRIMARY (new) and ACTIVE (old tasks being drained) |
next((d for d in ... if d["status"] == "PRIMARY"), None) | Finds the PRIMARY deployment (the most recent one). next() returns the first match, or None if not found |
desired = primary["desiredCount"]
running = primary["runningCount"]
failed = primary["failedTasks"]
if failed > 0: return "FAILED"
if running == desired and desired > 0: return "HEALTHY"
return f"IN_PROGRESS ({running}/{desired} tasks running)"
| Line | Explanation |
|---|
primary["desiredCount"] | Target number of tasks ECS wants running (from your service configuration) |
primary["runningCount"] | Currently running tasks in HEALTHY state |
primary["failedTasks"] | Count of task launches that failed since the deployment started. Non-zero = something is broken |
running == desired and desired > 0 | Deployment is complete. desired > 0 handles the edge case of a service scaling to 0 |
rollback(previous_td_arn)
self.ecs.update_service(
cluster=self.cluster,
service=self.service,
taskDefinition=previous_td_arn,
)
| Line | Explanation |
|---|
update_service(taskDefinition=previous_td_arn) | Rollback uses the exact same API call as deployment — just with the old task definition ARN instead of the new one |
Why no deploymentConfiguration? | During rollback, we accept the default behavior — get healthy as fast as possible. The default minimumHealthyPercent=100 still applies |
deploy() — Orchestrator
current_td = self.get_current_task_definition() # Save for rollback
new_td = self.create_new_task_definition(current_td, new_image)
self.trigger_deployment(new_td)
for attempt in range(1, health_check_retries + 1):
time.sleep(health_check_delay)
status = self.get_deployment_status()
if status == "HEALTHY": return True
if status == "FAILED":
self.rollback(current_td)
return False
self.rollback(current_td) # Timeout
return False
| Line | Explanation |
|---|
current_td = self.get_current_task_definition() | Captures the old task definition ARN BEFORE deploying. This is the rollback target |
time.sleep(health_check_delay) | Waits 30 seconds between polls. ECS typically takes 30–120 seconds to launch new tasks |
self.rollback(current_td) | Called on both "FAILED" status AND timeout. The old task definition is the safe state |
sys.exit(0 if success else 1) | Exit code 0 = CI/CD pipeline continues. Exit code 1 = pipeline marks the step as failed |