Scenario Advanced Python Python AWS Scripting

ECS Rolling Deployment with Automatic Rollback on Health Check Failure

Python script to deploy a new container image to an ECS service with rolling update strategy, monitor health, and automatically roll back if the deployment fails.

January 20, 2025 9 min read ~25 min to complete DB
The Situation

Container deployment automation — replace manual ECS deployments with a script that handles versioning, health monitoring, and automatic rollback.

8 Steps
4 Services Used
~25 min Duration
Advanced Difficulty

Problem Statement

Your ECS deployment script just updates the service and walks away. When the new container fails to start (bad image, wrong env var, OOM), tasks stay in FAILED state until someone notices the monitoring dashboard 20 minutes later. This script monitors the deployment and rolls back automatically within seconds of detecting failure.


ECS Deployment Flow

Describe current task definition
         ↓
Register new task definition (with updated image)
         ↓
update_service() with new task definition + rolling config
         ↓
Poll service deployments every 30 seconds
         ↓
         ├── running == desired → SUCCESS
         ├── failedTasks > 0   → ROLLBACK to previous task definition
         └── timeout           → ROLLBACK

Complete Script

import boto3
import time
import sys
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")


class ECSRollingDeployer:
    def __init__(self, cluster: str, service: str, region: str = "us-east-1"):
        """
        cluster: ECS cluster name or ARN (e.g. "prod-cluster")
        service: ECS service name (e.g. "payment-service")
        """
        self.ecs     = boto3.client("ecs", region_name=region)
        self.cluster = cluster
        self.service = service

    # ── Step 1: Get current task definition ──────────────────────
    def get_current_task_definition(self) -> str:
        """
        describe_services() returns the full service configuration.
        services[0]["taskDefinition"] is the ARN of the currently
        deployed task definition, e.g.:
          arn:aws:ecs:us-east-1:123456789012:task-definition/payment:8
        """
        response = self.ecs.describe_services(
            cluster=self.cluster,
            services=[self.service],
        )
        if not response["services"]:
            raise ValueError(f"Service not found: {self.service} in {self.cluster}")
        return response["services"][0]["taskDefinition"]

    # ── Step 2: Register new task definition ─────────────────────
    def create_new_task_definition(self, current_td_arn: str, new_image: str) -> str:
        """
        ECS task definitions are immutable — you can't edit them.
        To deploy a new image, you must register a NEW task definition revision
        with the updated container image.

        describe_task_definition() returns the full spec of an existing revision.
        We then strip the read-only fields AWS adds (taskDefinitionArn, revision,
        status, etc.) and register a new revision with the updated image.

        containerDefinitions[0]["image"] is the Docker image URI.
        We update only the first container — adjust the index if your task
        has multiple containers and you want to update a specific one.
        """
        td = self.ecs.describe_task_definition(
            taskDefinition=current_td_arn
        )["taskDefinition"]

        # Update the image in container 0 (main application container)
        containers = td["containerDefinitions"]
        old_image = containers[0]["image"]
        containers[0]["image"] = new_image
        logger.info(f"Image: {old_image}{new_image}")

        # Strip fields that are set by AWS and cannot be provided on register
        readonly_fields = [
            "taskDefinitionArn", "revision", "status",
            "requiresAttributes", "compatibilities",
            "registeredAt", "registeredBy",
        ]
        for field in readonly_fields:
            td.pop(field, None)

        response = self.ecs.register_task_definition(**td)
        new_td_arn = response["taskDefinition"]["taskDefinitionArn"]
        logger.info(f"Registered new task definition: {new_td_arn}")
        return new_td_arn

    # ── Step 3: Update service ────────────────────────────────────
    def trigger_deployment(self, new_td_arn: str) -> None:
        """
        update_service() tells ECS to replace running tasks with new ones
        using the specified task definition.

        deploymentConfiguration controls the rolling update strategy:
        - minimumHealthyPercent=100: Never go below 100% capacity
          (ensures zero downtime — requires extra capacity)
        - maximumPercent=200: Allow up to 2× the desired count during deploy
          (starts new tasks before stopping old ones — blue/green-like)

        forceNewDeployment=True is needed if the task definition ARN
        hasn't changed (same tag like "latest") — forces ECS to pull
        the latest image even if the ARN is the same.
        """
        self.ecs.update_service(
            cluster=self.cluster,
            service=self.service,
            taskDefinition=new_td_arn,
            deploymentConfiguration={
                "minimumHealthyPercent": 100,
                "maximumPercent":        200,
            },
            forceNewDeployment=True,
        )

    # ── Step 4: Poll deployment health ───────────────────────────
    def get_deployment_status(self) -> str:
        """
        describe_services() returns a list of deployments for the service.
        There is always a PRIMARY deployment (the latest) and optionally
        ACTIVE deployments (older tasks being drained).

        Primary deployment fields:
        - desiredCount:  how many tasks ECS wants to run
        - runningCount:  how many are actually running
        - failedTasks:   how many task launches have failed

        Once runningCount == desiredCount and failedTasks == 0,
        the deployment is complete.
        """
        response = self.ecs.describe_services(
            cluster=self.cluster,
            services=[self.service],
        )
        deployments = response["services"][0]["deployments"]

        # Find the PRIMARY (most recent) deployment
        primary = next(
            (d for d in deployments if d["status"] == "PRIMARY"),
            None,
        )
        if not primary:
            return "UNKNOWN"

        desired = primary["desiredCount"]
        running = primary["runningCount"]
        failed  = primary["failedTasks"]

        if failed > 0:
            logger.error(f"Deployment failure: {failed} failed task(s)")
            return "FAILED"

        if running == desired and desired > 0:
            return "HEALTHY"

        return f"IN_PROGRESS ({running}/{desired} tasks running)"

    # ── Step 5: Roll back ─────────────────────────────────────────
    def rollback(self, previous_td_arn: str) -> None:
        """
        Rollback by calling update_service() with the PREVIOUS task definition.
        ECS will drain the failing new tasks and start tasks from the old definition.
        This is the same API call as deployment — ECS handles the swap gracefully.
        """
        logger.warning(f"Rolling back to: {previous_td_arn}")
        self.ecs.update_service(
            cluster=self.cluster,
            service=self.service,
            taskDefinition=previous_td_arn,
        )
        logger.info("Rollback triggered. Previous version will restore.")

    # ── Orchestrator ──────────────────────────────────────────────
    def deploy(
        self,
        new_image: str,
        health_check_retries: int = 15,
        health_check_delay: int   = 30,
    ) -> bool:
        """
        Full deployment lifecycle:
        1. Capture current state for rollback
        2. Register new task definition
        3. Trigger rolling update
        4. Poll health status
        5. Rollback if failed or timed out

        Returns True if deployment succeeded, False if rolled back.
        """
        logger.info(f"Starting deployment of: {new_image}")

        # Save the current (stable) task definition for rollback
        current_td = self.get_current_task_definition()
        logger.info(f"Current task definition (rollback target): {current_td}")

        # Create and deploy new task definition
        new_td = self.create_new_task_definition(current_td, new_image)
        self.trigger_deployment(new_td)
        logger.info(f"Deployment triggered. Monitoring health ({health_check_retries} checks × {health_check_delay}s)...")

        # Monitor deployment
        for attempt in range(1, health_check_retries + 1):
            time.sleep(health_check_delay)
            status = self.get_deployment_status()
            logger.info(f"[{attempt}/{health_check_retries}] Status: {status}")

            if status == "HEALTHY":
                logger.info(f"✅ Deployment succeeded!")
                return True

            if status == "FAILED":
                logger.error("❌ Deployment failed! Rolling back...")
                self.rollback(current_td)
                return False

        # Timeout — roll back
        logger.error("⏰ Health check timed out. Rolling back...")
        self.rollback(current_td)
        return False


# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
    deployer = ECSRollingDeployer(
        cluster="prod-cluster",
        service="payment-service",
        region="ap-south-1",
    )

    success = deployer.deploy(
        new_image="123456789012.dkr.ecr.ap-south-1.amazonaws.com/payment:v2.4.1",
        health_check_retries=15,   # Wait up to 7.5 min (15 × 30s)
        health_check_delay=30,
    )

    # Exit with non-zero code on failure — triggers CI/CD pipeline failure
    sys.exit(0 if success else 1)

Key Commands Explained

CommandWhat it does
describe_services(cluster, services)Returns full service state including all deployments
describe_task_definition(taskDefinition=arn)Returns the full spec of a task definition revision
register_task_definition(**td)Creates a new immutable revision with the updated spec
update_service(cluster, service, taskDefinition, deploymentConfiguration)Triggers a rolling update to the new task definition
minimumHealthyPercent=100Never drop below 100% capacity during deployment
maximumPercent=200Allow up to 200% tasks during deployment (old + new)
forceNewDeployment=TrueForces a re-pull even if the task definition ARN didn’t change
deployment["failedTasks"]Number of task launch failures — non-zero means rollback
deployment["runningCount"] == deployment["desiredCount"]All desired tasks are healthy

Common Issues

Deployment stuck in IN_PROGRESS — The new task is failing to start. Check ecs describe-tasks for stopped tasks, then aws logs get-log-events for the container log output.

SERVICE_NOT_ACTIVE error — The ECS service itself is in a bad state (e.g., being deleted). Check the ECS console for service events.

Rolling update causes downtime — If minimumHealthyPercent=50, ECS stops 50% of tasks before starting new ones. Use minimumHealthyPercent=100 + maximumPercent=200 for zero-downtime deployments (requires capacity headroom).


🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK — needed for ECS client calls
import timetime.sleep(30) — pauses between health status polls
import syssys.exit(0 or 1) — returns appropriate exit code for CI/CD pipelines
import loggingStructured log output

ECSRollingDeployer.__init__

self.ecs     = boto3.client("ecs", region_name=region)
self.cluster = cluster
self.service = service
LineExplanation
boto3.client("ecs", region_name=region)ECS client for the target region. ECS clusters are regional
self.clusterThe ECS cluster name or ARN. All API calls require this to identify which cluster
self.serviceThe ECS service name (e.g., "payment-service"). A cluster can have many services

get_current_task_definition()

response = self.ecs.describe_services(
    cluster=self.cluster,
    services=[self.service],
)
return response["services"][0]["taskDefinition"]
LineExplanation
describe_services(cluster=..., services=[...])Returns the full service configuration. services is a list — you can request up to 10 services at once
response["services"][0]The first (and only) service dict in the response
["taskDefinition"]The ARN of the currently deployed task definition, e.g., arn:aws:ecs:...:task-definition/payment:8. The :8 is the revision number
if not response["services"]: raise ValueErrorGuard for the case where the service doesn’t exist — gives a clear error message

create_new_task_definition(current_td_arn, new_image)

td = self.ecs.describe_task_definition(
    taskDefinition=current_td_arn
)["taskDefinition"]
LineExplanation
describe_task_definition(taskDefinition=current_td_arn)Fetches the full spec of an existing revision. This spec is the template for our new revision
["taskDefinition"]The task definition dict containing containerDefinitions, cpu, memory, networkMode, executionRoleArn, etc.
containers = td["containerDefinitions"]
old_image = containers[0]["image"]
containers[0]["image"] = new_image
LineExplanation
td["containerDefinitions"]List of container specs — one dict per container in the task
containers[0]The first container (main application container). If your task has a sidecar, you’d target a different index
containers[0]["image"] = new_imageReplaces the Docker image URI. E.g., changes payment:v2.4.0 to payment:v2.4.1
readonly_fields = [
    "taskDefinitionArn", "revision", "status",
    "requiresAttributes", "compatibilities",
    "registeredAt", "registeredBy",
]
for field in readonly_fields:
    td.pop(field, None)
LineExplanation
readonly_fieldsFields that AWS populates automatically. If you include them in register_task_definition(), the API raises ClientException: Unknown parameter
td.pop(field, None)Removes the field from the dict if it exists. .pop(key, None) returns None instead of raising KeyError if the key is absent
response = self.ecs.register_task_definition(**td)
new_td_arn = response["taskDefinition"]["taskDefinitionArn"]
LineExplanation
register_task_definition(**td)**td unpacks the dict as keyword arguments. This passes all remaining fields (containerDefinitions, cpu, memory, etc.) as named parameters
response["taskDefinition"]["taskDefinitionArn"]The ARN of the newly created revision (e.g., ...:task-definition/payment:9)

trigger_deployment(new_td_arn)

self.ecs.update_service(
    cluster=self.cluster,
    service=self.service,
    taskDefinition=new_td_arn,
    deploymentConfiguration={
        "minimumHealthyPercent": 100,
        "maximumPercent":        200,
    },
    forceNewDeployment=True,
)
ParameterExplanation
taskDefinition=new_td_arnTells ECS to run tasks using this new task definition revision
minimumHealthyPercent=100ECS must keep at least 100% of the desired count running at all times. New tasks start BEFORE old ones stop (requires enough cluster capacity for 2×)
maximumPercent=200ECS may run up to 200% of the desired count (old + new tasks) during the transition
forceNewDeployment=TrueRequired when the task definition ARN is the same (e.g., using the latest tag). Forces ECS to pull the newest image even if the ARN hasn’t changed

get_deployment_status()

deployments = response["services"][0]["deployments"]
primary = next(
    (d for d in deployments if d["status"] == "PRIMARY"),
    None,
)
LineExplanation
response["services"][0]["deployments"]List of deployment objects. During a rolling update, there are multiple: PRIMARY (new) and ACTIVE (old tasks being drained)
next((d for d in ... if d["status"] == "PRIMARY"), None)Finds the PRIMARY deployment (the most recent one). next() returns the first match, or None if not found
desired = primary["desiredCount"]
running = primary["runningCount"]
failed  = primary["failedTasks"]

if failed > 0:   return "FAILED"
if running == desired and desired > 0:   return "HEALTHY"
return f"IN_PROGRESS ({running}/{desired} tasks running)"
LineExplanation
primary["desiredCount"]Target number of tasks ECS wants running (from your service configuration)
primary["runningCount"]Currently running tasks in HEALTHY state
primary["failedTasks"]Count of task launches that failed since the deployment started. Non-zero = something is broken
running == desired and desired > 0Deployment is complete. desired > 0 handles the edge case of a service scaling to 0

rollback(previous_td_arn)

self.ecs.update_service(
    cluster=self.cluster,
    service=self.service,
    taskDefinition=previous_td_arn,
)
LineExplanation
update_service(taskDefinition=previous_td_arn)Rollback uses the exact same API call as deployment — just with the old task definition ARN instead of the new one
Why no deploymentConfiguration?During rollback, we accept the default behavior — get healthy as fast as possible. The default minimumHealthyPercent=100 still applies

deploy() — Orchestrator

current_td = self.get_current_task_definition()   # Save for rollback
new_td = self.create_new_task_definition(current_td, new_image)
self.trigger_deployment(new_td)

for attempt in range(1, health_check_retries + 1):
    time.sleep(health_check_delay)
    status = self.get_deployment_status()
    if status == "HEALTHY":   return True
    if status == "FAILED":
        self.rollback(current_td)
        return False

self.rollback(current_td)   # Timeout
return False
LineExplanation
current_td = self.get_current_task_definition()Captures the old task definition ARN BEFORE deploying. This is the rollback target
time.sleep(health_check_delay)Waits 30 seconds between polls. ECS typically takes 30–120 seconds to launch new tasks
self.rollback(current_td)Called on both "FAILED" status AND timeout. The old task definition is the safe state
sys.exit(0 if success else 1)Exit code 0 = CI/CD pipeline continues. Exit code 1 = pipeline marks the step as failed
Services Used
ECSECRboto3IAM
Prerequisites
  • Python 3.8+
  • boto3
  • ECS cluster and service running
  • IAM: ecs:DescribeServices, ecs:UpdateService, ecs:RegisterTaskDefinition, ecs:DescribeTaskDefinition
What You Learned
  • ECS task definition versioning
  • update_service deployment configuration
  • Polling deployment status
  • Automatic rollback pattern
  • minimumHealthyPercent and maximumPercent settings

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios