Scenario Advanced Python Python AWS Scripting

Deploy Lambda Function with Version Publishing & Alias Traffic Shifting

Python script to deploy or update a Lambda function, update environment variables, publish a new version, and create an alias with canary traffic shifting.

January 20, 2025 9 min read ~25 min to complete DB
The Situation

Serverless CI/CD — automate Lambda deployments with safe canary releases using alias routing, so you can shift 10% of traffic to a new version before full rollout.

7 Steps
3 Services Used
~25 min Duration
Advanced Difficulty

Problem Statement

Your team deploys Lambda functions by hand through the AWS Console — which means no audit trail, no rollback plan, and accidental overwrites of environment variables. This script provides a repeatable deployment pipeline with canary releases.


Lambda Deployment Lifecycle

Zip source code
     ↓
update_function_code()      ← uploads new code (state: InProgress)
     ↓
wait for LastUpdateStatus = "Successful"
     ↓
update_function_configuration()  ← update env vars / memory / timeout
     ↓
publish_version()           ← snapshot the code + config as Version N
     ↓
create_or_update_alias()    ← point "prod" alias to V(N) with weights

Complete Script

import boto3
import zipfile
import io
import hashlib
import time
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")


class LambdaDeployer:
    def __init__(self, region: str = "us-east-1"):
        """
        boto3.client("lambda") is the low-level Lambda client.
        We use it directly (not the resource API) because we need
        fine-grained control over publish_version and create_alias.
        """
        self.lmb = boto3.client("lambda", region_name=region)

    # ── Step 1: Create deployment package ────────────────────────
    def create_deployment_package(self, source_file: str) -> bytes:
        """
        Lambda requires the function code as a ZIP archive.
        zipfile.ZipFile + io.BytesIO creates an in-memory ZIP
        so we don't need to write a temp file to disk.

        ZIP_DEFLATED compresses the archive (required for large files).
        zf.write(source_file, arcname="lambda_function.py") writes the
        file into the archive with a specific internal name — Lambda
        uses this name to find the handler module.
        """
        zip_buffer = io.BytesIO()
        with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
            zf.write(source_file, arcname="lambda_function.py")
        zip_buffer.seek(0)
        return zip_buffer.read()

    # ── Step 2: Deploy or update ──────────────────────────────────
    def deploy_or_update(
        self,
        function_name: str,
        source_file: str,
        role_arn: str,
        env_vars: dict,
        runtime: str  = "python3.12",
        memory: int   = 256,
        timeout: int  = 30,
    ) -> str:
        """
        Returns the published version number (as a string, e.g. "5").

        We use a SHA-256 hash of the zip bytes as a human-readable
        deployment identifier — shows up in logs and Lambda tags.
        """
        zip_bytes = self.create_deployment_package(source_file)
        code_hash = hashlib.sha256(zip_bytes).hexdigest()[:8]
        logger.info(f"Deploying {function_name} | code hash: {code_hash}")

        try:
            # ── UPDATE existing function ──────────────────────────
            self.lmb.get_function(FunctionName=function_name)
            logger.info(f"Function exists — updating")

            # update_function_code() replaces the function code.
            # Publish=False means we upload the code first and publish
            # a new version separately after updating config too.
            self.lmb.update_function_code(
                FunctionName=function_name,
                ZipFile=zip_bytes,
                Publish=False,
            )
            # Lambda updates are asynchronous — must wait before next call
            self._wait_for_update(function_name)

            # update_function_configuration() changes runtime settings.
            # Must be a separate call from update_function_code.
            self.lmb.update_function_configuration(
                FunctionName=function_name,
                Environment={"Variables": env_vars},
                MemorySize=memory,
                Timeout=timeout,
            )
            self._wait_for_update(function_name)

        except self.lmb.exceptions.ResourceNotFoundException:
            # ── CREATE new function ───────────────────────────────
            logger.info(f"Function does not exist — creating")
            self.lmb.create_function(
                FunctionName=function_name,
                Runtime=runtime,
                Role=role_arn,
                # Handler = "module_name.function_name"
                # Lambda imports lambda_function.py and calls lambda_handler()
                Handler="lambda_function.lambda_handler",
                Code={"ZipFile": zip_bytes},
                Environment={"Variables": env_vars},
                MemorySize=memory,
                Timeout=timeout,
                Tags={"DeployedBy": "DeployScript", "CodeHash": code_hash},
            )
            self._wait_for_active(function_name)

        # ── Step 3: Publish version ───────────────────────────────
        # publish_version() takes a snapshot of the current code + config
        # and assigns it an immutable version number.
        # Versions are permanent — they can only be deleted explicitly.
        version_resp = self.lmb.publish_version(
            FunctionName=function_name,
            Description=f"Deploy {code_hash}",
        )
        version = version_resp["Version"]
        logger.info(f"Published version: {version}")
        return version

    # ── Step 4: Create / update alias with traffic shifting ───────
    def create_or_update_alias(
        self,
        function_name: str,
        alias: str,
        version: str,
        canary_weight: float = None,
        previous_version: str = None,
    ) -> None:
        """
        An alias is a named pointer to one (or two) Lambda versions.
        Used so callers reference "prod" instead of a version number.

        RoutingConfig.AdditionalVersionWeights enables canary deployment:
        - {"5": 0.9} means 90% traffic to version 5, 10% to this alias's
          FunctionVersion (the new one). Weights must sum to < 1.0.

        Set canary_weight=None to route 100% to the new version.
        """
        config: dict = {
            "FunctionName": function_name,
            "Name":         alias,
            "FunctionVersion": version,
            "Description":  f"Points to version {version}",
        }

        if canary_weight is not None and previous_version is not None:
            # Send `canary_weight` fraction of traffic to old version
            # Remaining fraction (1 - canary_weight) goes to `version`
            config["RoutingConfig"] = {
                "AdditionalVersionWeights": {
                    previous_version: canary_weight
                }
            }
            logger.info(
                f"Canary: {(1-canary_weight)*100:.0f}% → v{version}, "
                f"{canary_weight*100:.0f}% → v{previous_version}"
            )

        try:
            self.lmb.get_alias(FunctionName=function_name, Name=alias)
            self.lmb.update_alias(**config)
            logger.info(f"Updated alias '{alias}' → version {version}")
        except self.lmb.exceptions.ResourceNotFoundException:
            self.lmb.create_alias(**config)
            logger.info(f"Created alias '{alias}' → version {version}")

    def promote_alias(self, function_name: str, alias: str, version: str) -> None:
        """After canary looks healthy, promote to 100% traffic."""
        self.lmb.update_alias(
            FunctionName=function_name,
            Name=alias,
            FunctionVersion=version,
            RoutingConfig={"AdditionalVersionWeights": {}},  # Clear canary weights
        )
        logger.info(f"Alias '{alias}' promoted to 100% → version {version}")

    # ── Waiters ───────────────────────────────────────────────────
    def _wait_for_update(self, function_name: str, max_wait: int = 60) -> None:
        """
        Lambda updates are asynchronous. get_function_configuration()
        returns LastUpdateStatus which cycles:
          InProgress → Successful (or Failed)
        Polling with 1-second intervals is fine; updates usually complete in 2-5s.
        """
        for _ in range(max_wait):
            resp = self.lmb.get_function_configuration(FunctionName=function_name)
            status = resp["LastUpdateStatus"]
            if status == "Successful":
                return
            if status == "Failed":
                raise RuntimeError(
                    f"Lambda update failed: {resp.get('LastUpdateStatusReasonCode')}"
                )
            time.sleep(1)
        raise TimeoutError(f"Lambda update timed out for {function_name}")

    def _wait_for_active(self, function_name: str, max_wait: int = 60) -> None:
        """
        After create_function(), the State cycles: Pending → Active.
        Must wait before calling publish_version() or the API will error.
        """
        for _ in range(max_wait):
            resp = self.lmb.get_function_configuration(FunctionName=function_name)
            if resp["State"] == "Active":
                return
            time.sleep(1)
        raise TimeoutError(f"Lambda activation timed out for {function_name}")


# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
    deployer = LambdaDeployer(region="us-east-1")

    env_vars = {
        "ENVIRONMENT": "production",
        "DB_HOST":     "prod-db.cluster.us-east-1.rds.amazonaws.com",
        "LOG_LEVEL":   "INFO",
    }

    # Deploy and get new version number
    new_version = deployer.deploy_or_update(
        function_name="my-app-processor",
        source_file="lambda_function.py",
        role_arn="arn:aws:iam::123456789012:role/LambdaExecRole",
        env_vars=env_vars,
        memory=512,
        timeout=60,
    )

    # Canary: route 10% to new version, 90% stays on previous version "4"
    deployer.create_or_update_alias(
        function_name="my-app-processor",
        alias="prod",
        version=new_version,        # e.g., "5"
        canary_weight=0.9,          # 90% to old version
        previous_version="4",       # Previous stable version
    )

    # After observing metrics for 15 minutes...
    # deployer.promote_alias("my-app-processor", "prod", new_version)

Key Commands Explained

CommandWhat it does
zipfile.ZipFile(io.BytesIO(), "w", ZIP_DEFLATED)Creates in-memory ZIP archive
update_function_code(ZipFile=..., Publish=False)Uploads new code without creating a version
update_function_configuration(Environment=...)Updates env vars / memory / timeout separately
publish_version(Description=...)Snapshots current code + config as an immutable version
version_resp["Version"]Returns the version number as a string (e.g., "5")
create_alias(FunctionVersion=version)Creates a named pointer to a specific version
RoutingConfig.AdditionalVersionWeightsSplits traffic between versions for canary releases
get_function_configuration()["LastUpdateStatus"]Checks if an async update has completed

Common Issues

ResourceConflictException on update — Another update is in progress. Always call _wait_for_update() before making another configuration change.

InvalidParameterValueException on weights — Canary weights must sum to less than 1.0 (not equal to 1.0). The main alias version gets the remainder.

PackageStorageException — The ZIP is too large (250 MB uncompressed limit). Use a Lambda layer for large dependencies and keep the function code small.


🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK — provides the Lambda client
import zipfileStandard library for creating ZIP archives. Lambda requires code as a ZIP file
import ioStandard library. io.BytesIO() creates an in-memory bytes buffer — we build the ZIP in RAM, not on disk
import hashlibComputes a SHA-256 hash of the ZIP bytes — used as a deployment identifier in logs and tags
import timetime.sleep(1) in the waiter — pauses between status checks
import loggingStructured log output

create_deployment_package(source_file)

zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
    zf.write(source_file, arcname="lambda_function.py")
zip_buffer.seek(0)
return zip_buffer.read()
LineExplanation
io.BytesIO()Creates an in-memory bytes buffer that behaves like a file. We write the ZIP into this instead of a temp file on disk — faster, no cleanup needed
zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED)Opens the in-memory buffer as a writeable ZIP archive. "w" = write mode. ZIP_DEFLATED = use DEFLATE compression (required for larger files)
zf.write(source_file, arcname="lambda_function.py")Adds the source file to the archive with a fixed internal name lambda_function.py. Lambda imports this module using the Handler field: "lambda_function.lambda_handler" means “import lambda_function, call lambda_handler()
zip_buffer.seek(0)Resets the buffer’s read position to the beginning. Without this, read() would return empty bytes
return zip_buffer.read()Reads all bytes from the buffer. These bytes are what we pass to the ZipFile parameter of update_function_code()

deploy_or_update(function_name, source_file, ...)

zip_bytes = self.create_deployment_package(source_file)
code_hash = hashlib.sha256(zip_bytes).hexdigest()[:8]
LineExplanation
create_deployment_package(source_file)Returns the ZIP as raw bytes
hashlib.sha256(zip_bytes)Computes SHA-256 of the ZIP bytes — a fingerprint of this exact code version
.hexdigest()[:8]Takes the first 8 hex characters as a short identifier (e.g., "a1b2c3d4"). Used in the version description and Lambda tags for traceability
self.lmb.get_function(FunctionName=function_name)
LineExplanation
get_function(FunctionName=...)Checks if the function exists. If it does, we update it. If it raises ResourceNotFoundException, we create it
Why use try/except instead of listing functions?More efficient — one direct call vs paginating through all functions
self.lmb.update_function_code(
    FunctionName=function_name,
    ZipFile=zip_bytes,
    Publish=False,
)
self._wait_for_update(function_name)
LineExplanation
update_function_code(ZipFile=zip_bytes)Uploads the new ZIP to Lambda. The function now runs the new code
ZipFile=zip_bytesPasses the raw bytes directly. For files > 50 MB, use S3Bucket/S3Key instead
Publish=FalseDoes NOT create a new version yet. We update config next, then publish once after both changes. If we published here, we’d create a version with the new code but old config
self._wait_for_update(function_name)Lambda updates are asynchronous — you must wait for LastUpdateStatus == "Successful" before making another change. Without waiting, the next call raises ResourceConflictException
self.lmb.update_function_configuration(
    FunctionName=function_name,
    Environment={"Variables": env_vars},
    MemorySize=memory,
    Timeout=timeout,
)
LineExplanation
update_function_configuration(...)Updates runtime settings separately from code. AWS requires two separate API calls — code and config cannot be updated atomically
Environment={"Variables": env_vars}Sets environment variables. env_vars is a plain dict like {"DB_HOST": "...", "LOG_LEVEL": "INFO"}
MemorySize=memoryRAM allocated to the Lambda in MB. Also proportionally increases CPU. Valid: 128–10,240 MB in 1 MB increments
Timeout=timeoutMax execution time in seconds. Lambda kills the function if it exceeds this. Max: 900 (15 min)

publish_version()

version_resp = self.lmb.publish_version(
    FunctionName=function_name,
    Description=f"Deploy {code_hash}",
)
version = version_resp["Version"]
LineExplanation
publish_version(FunctionName=...)Takes a snapshot of the current code + config and assigns it an immutable version number (e.g., "5")
Description=f"Deploy {code_hash}"Human-readable label for this version. Shown in the Lambda console version list
version_resp["Version"]The version number as a string (e.g., "5"). Lambda versions always use strings, not integers
Why publish a version?Without versioning, aliases always point to $LATEST (the current unpublished code). With versioning, you can roll back by pointing the alias to a previous version number

create_or_update_alias(function_name, alias, version, canary_weight, previous_version)

config["RoutingConfig"] = {
    "AdditionalVersionWeights": {previous_version: canary_weight}
}
LineExplanation
RoutingConfigControls traffic splitting between two Lambda versions
AdditionalVersionWeights: {"4": 0.9}Sends 90% of traffic to version 4. The remaining 10% goes to the alias’s FunctionVersion (the new version 5). Weights must sum to less than 1.0
Why canary?If version 5 has a bug, only 10% of users are affected. You can monitor error rates and roll back by updating the alias to 100% on version 4
try:
    self.lmb.get_alias(FunctionName=function_name, Name=alias)
    self.lmb.update_alias(**config)
except self.lmb.exceptions.ResourceNotFoundException:
    self.lmb.create_alias(**config)
LineExplanation
get_alias(FunctionName=..., Name=alias)Checks if this alias already exists
update_alias(**config)**config unpacks the dict as keyword arguments. Equivalent to writing each key explicitly
create_alias(**config)Creates the alias on first deploy
Idempotent patternThe same code path handles both first-time and subsequent deploys — safe to run multiple times

_wait_for_update(function_name) — Waiter

for _ in range(max_wait):
    resp = self.lmb.get_function_configuration(FunctionName=function_name)
    status = resp["LastUpdateStatus"]
    if status == "Successful":
        return
    if status == "Failed":
        raise RuntimeError(...)
    time.sleep(1)
raise TimeoutError(...)
LineExplanation
for _ in range(max_wait)Polls up to max_wait times (default 60 times = 60 seconds). _ is convention for “don’t care about this variable”
get_function_configuration()Returns the current function config including LastUpdateStatus
resp["LastUpdateStatus"]One of: "Successful" (ready), "InProgress" (still updating), "Failed" (update crashed)
return on "Successful"Exits the loop — the function is ready for the next operation
raise RuntimeError on "Failed"Lambda failed to apply the update. The previous version is still running
time.sleep(1)Waits 1 second between polls. Lambda updates typically complete in 2–5 seconds
raise TimeoutError after loopIf still updating after 60 seconds, something is wrong
Services Used
LambdaIAMboto3
Prerequisites
  • Python 3.8+
  • boto3
  • IAM: lambda:CreateFunction, lambda:UpdateFunctionCode, lambda:UpdateFunctionConfiguration, lambda:PublishVersion, lambda:CreateAlias, lambda:UpdateAlias
What You Learned
  • Lambda deployment package creation
  • publish_version vs update_function_code
  • Alias routing weights for canary deployments
  • Waiting for Lambda update completion

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios