The Situation
Serverless CI/CD — automate Lambda deployments with safe canary releases using alias routing, so you can shift 10% of traffic to a new version before full rollout.
Problem Statement
Your team deploys Lambda functions by hand through the AWS Console — which means no audit trail, no rollback plan, and accidental overwrites of environment variables. This script provides a repeatable deployment pipeline with canary releases.
Lambda Deployment Lifecycle
Zip source code
↓
update_function_code() ← uploads new code (state: InProgress)
↓
wait for LastUpdateStatus = "Successful"
↓
update_function_configuration() ← update env vars / memory / timeout
↓
publish_version() ← snapshot the code + config as Version N
↓
create_or_update_alias() ← point "prod" alias to V(N) with weights
Complete Script
import boto3
import zipfile
import io
import hashlib
import time
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
class LambdaDeployer:
def __init__(self, region: str = "us-east-1"):
"""
boto3.client("lambda") is the low-level Lambda client.
We use it directly (not the resource API) because we need
fine-grained control over publish_version and create_alias.
"""
self.lmb = boto3.client("lambda", region_name=region)
# ── Step 1: Create deployment package ────────────────────────
def create_deployment_package(self, source_file: str) -> bytes:
"""
Lambda requires the function code as a ZIP archive.
zipfile.ZipFile + io.BytesIO creates an in-memory ZIP
so we don't need to write a temp file to disk.
ZIP_DEFLATED compresses the archive (required for large files).
zf.write(source_file, arcname="lambda_function.py") writes the
file into the archive with a specific internal name — Lambda
uses this name to find the handler module.
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.write(source_file, arcname="lambda_function.py")
zip_buffer.seek(0)
return zip_buffer.read()
# ── Step 2: Deploy or update ──────────────────────────────────
def deploy_or_update(
self,
function_name: str,
source_file: str,
role_arn: str,
env_vars: dict,
runtime: str = "python3.12",
memory: int = 256,
timeout: int = 30,
) -> str:
"""
Returns the published version number (as a string, e.g. "5").
We use a SHA-256 hash of the zip bytes as a human-readable
deployment identifier — shows up in logs and Lambda tags.
"""
zip_bytes = self.create_deployment_package(source_file)
code_hash = hashlib.sha256(zip_bytes).hexdigest()[:8]
logger.info(f"Deploying {function_name} | code hash: {code_hash}")
try:
# ── UPDATE existing function ──────────────────────────
self.lmb.get_function(FunctionName=function_name)
logger.info(f"Function exists — updating")
# update_function_code() replaces the function code.
# Publish=False means we upload the code first and publish
# a new version separately after updating config too.
self.lmb.update_function_code(
FunctionName=function_name,
ZipFile=zip_bytes,
Publish=False,
)
# Lambda updates are asynchronous — must wait before next call
self._wait_for_update(function_name)
# update_function_configuration() changes runtime settings.
# Must be a separate call from update_function_code.
self.lmb.update_function_configuration(
FunctionName=function_name,
Environment={"Variables": env_vars},
MemorySize=memory,
Timeout=timeout,
)
self._wait_for_update(function_name)
except self.lmb.exceptions.ResourceNotFoundException:
# ── CREATE new function ───────────────────────────────
logger.info(f"Function does not exist — creating")
self.lmb.create_function(
FunctionName=function_name,
Runtime=runtime,
Role=role_arn,
# Handler = "module_name.function_name"
# Lambda imports lambda_function.py and calls lambda_handler()
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zip_bytes},
Environment={"Variables": env_vars},
MemorySize=memory,
Timeout=timeout,
Tags={"DeployedBy": "DeployScript", "CodeHash": code_hash},
)
self._wait_for_active(function_name)
# ── Step 3: Publish version ───────────────────────────────
# publish_version() takes a snapshot of the current code + config
# and assigns it an immutable version number.
# Versions are permanent — they can only be deleted explicitly.
version_resp = self.lmb.publish_version(
FunctionName=function_name,
Description=f"Deploy {code_hash}",
)
version = version_resp["Version"]
logger.info(f"Published version: {version}")
return version
# ── Step 4: Create / update alias with traffic shifting ───────
def create_or_update_alias(
self,
function_name: str,
alias: str,
version: str,
canary_weight: float = None,
previous_version: str = None,
) -> None:
"""
An alias is a named pointer to one (or two) Lambda versions.
Used so callers reference "prod" instead of a version number.
RoutingConfig.AdditionalVersionWeights enables canary deployment:
- {"5": 0.9} means 90% traffic to version 5, 10% to this alias's
FunctionVersion (the new one). Weights must sum to < 1.0.
Set canary_weight=None to route 100% to the new version.
"""
config: dict = {
"FunctionName": function_name,
"Name": alias,
"FunctionVersion": version,
"Description": f"Points to version {version}",
}
if canary_weight is not None and previous_version is not None:
# Send `canary_weight` fraction of traffic to old version
# Remaining fraction (1 - canary_weight) goes to `version`
config["RoutingConfig"] = {
"AdditionalVersionWeights": {
previous_version: canary_weight
}
}
logger.info(
f"Canary: {(1-canary_weight)*100:.0f}% → v{version}, "
f"{canary_weight*100:.0f}% → v{previous_version}"
)
try:
self.lmb.get_alias(FunctionName=function_name, Name=alias)
self.lmb.update_alias(**config)
logger.info(f"Updated alias '{alias}' → version {version}")
except self.lmb.exceptions.ResourceNotFoundException:
self.lmb.create_alias(**config)
logger.info(f"Created alias '{alias}' → version {version}")
def promote_alias(self, function_name: str, alias: str, version: str) -> None:
"""After canary looks healthy, promote to 100% traffic."""
self.lmb.update_alias(
FunctionName=function_name,
Name=alias,
FunctionVersion=version,
RoutingConfig={"AdditionalVersionWeights": {}}, # Clear canary weights
)
logger.info(f"Alias '{alias}' promoted to 100% → version {version}")
# ── Waiters ───────────────────────────────────────────────────
def _wait_for_update(self, function_name: str, max_wait: int = 60) -> None:
"""
Lambda updates are asynchronous. get_function_configuration()
returns LastUpdateStatus which cycles:
InProgress → Successful (or Failed)
Polling with 1-second intervals is fine; updates usually complete in 2-5s.
"""
for _ in range(max_wait):
resp = self.lmb.get_function_configuration(FunctionName=function_name)
status = resp["LastUpdateStatus"]
if status == "Successful":
return
if status == "Failed":
raise RuntimeError(
f"Lambda update failed: {resp.get('LastUpdateStatusReasonCode')}"
)
time.sleep(1)
raise TimeoutError(f"Lambda update timed out for {function_name}")
def _wait_for_active(self, function_name: str, max_wait: int = 60) -> None:
"""
After create_function(), the State cycles: Pending → Active.
Must wait before calling publish_version() or the API will error.
"""
for _ in range(max_wait):
resp = self.lmb.get_function_configuration(FunctionName=function_name)
if resp["State"] == "Active":
return
time.sleep(1)
raise TimeoutError(f"Lambda activation timed out for {function_name}")
# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
deployer = LambdaDeployer(region="us-east-1")
env_vars = {
"ENVIRONMENT": "production",
"DB_HOST": "prod-db.cluster.us-east-1.rds.amazonaws.com",
"LOG_LEVEL": "INFO",
}
# Deploy and get new version number
new_version = deployer.deploy_or_update(
function_name="my-app-processor",
source_file="lambda_function.py",
role_arn="arn:aws:iam::123456789012:role/LambdaExecRole",
env_vars=env_vars,
memory=512,
timeout=60,
)
# Canary: route 10% to new version, 90% stays on previous version "4"
deployer.create_or_update_alias(
function_name="my-app-processor",
alias="prod",
version=new_version, # e.g., "5"
canary_weight=0.9, # 90% to old version
previous_version="4", # Previous stable version
)
# After observing metrics for 15 minutes...
# deployer.promote_alias("my-app-processor", "prod", new_version)
Key Commands Explained
| Command | What it does |
|---|
zipfile.ZipFile(io.BytesIO(), "w", ZIP_DEFLATED) | Creates in-memory ZIP archive |
update_function_code(ZipFile=..., Publish=False) | Uploads new code without creating a version |
update_function_configuration(Environment=...) | Updates env vars / memory / timeout separately |
publish_version(Description=...) | Snapshots current code + config as an immutable version |
version_resp["Version"] | Returns the version number as a string (e.g., "5") |
create_alias(FunctionVersion=version) | Creates a named pointer to a specific version |
RoutingConfig.AdditionalVersionWeights | Splits traffic between versions for canary releases |
get_function_configuration()["LastUpdateStatus"] | Checks if an async update has completed |
Common Issues
ResourceConflictException on update — Another update is in progress. Always call _wait_for_update() before making another configuration change.
InvalidParameterValueException on weights — Canary weights must sum to less than 1.0 (not equal to 1.0). The main alias version gets the remainder.
PackageStorageException — The ZIP is too large (250 MB uncompressed limit). Use a Lambda layer for large dependencies and keep the function code small.
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|
import boto3 | AWS SDK — provides the Lambda client |
import zipfile | Standard library for creating ZIP archives. Lambda requires code as a ZIP file |
import io | Standard library. io.BytesIO() creates an in-memory bytes buffer — we build the ZIP in RAM, not on disk |
import hashlib | Computes a SHA-256 hash of the ZIP bytes — used as a deployment identifier in logs and tags |
import time | time.sleep(1) in the waiter — pauses between status checks |
import logging | Structured log output |
create_deployment_package(source_file)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.write(source_file, arcname="lambda_function.py")
zip_buffer.seek(0)
return zip_buffer.read()
| Line | Explanation |
|---|
io.BytesIO() | Creates an in-memory bytes buffer that behaves like a file. We write the ZIP into this instead of a temp file on disk — faster, no cleanup needed |
zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) | Opens the in-memory buffer as a writeable ZIP archive. "w" = write mode. ZIP_DEFLATED = use DEFLATE compression (required for larger files) |
zf.write(source_file, arcname="lambda_function.py") | Adds the source file to the archive with a fixed internal name lambda_function.py. Lambda imports this module using the Handler field: "lambda_function.lambda_handler" means “import lambda_function, call lambda_handler()” |
zip_buffer.seek(0) | Resets the buffer’s read position to the beginning. Without this, read() would return empty bytes |
return zip_buffer.read() | Reads all bytes from the buffer. These bytes are what we pass to the ZipFile parameter of update_function_code() |
deploy_or_update(function_name, source_file, ...)
zip_bytes = self.create_deployment_package(source_file)
code_hash = hashlib.sha256(zip_bytes).hexdigest()[:8]
| Line | Explanation |
|---|
create_deployment_package(source_file) | Returns the ZIP as raw bytes |
hashlib.sha256(zip_bytes) | Computes SHA-256 of the ZIP bytes — a fingerprint of this exact code version |
.hexdigest()[:8] | Takes the first 8 hex characters as a short identifier (e.g., "a1b2c3d4"). Used in the version description and Lambda tags for traceability |
self.lmb.get_function(FunctionName=function_name)
| Line | Explanation |
|---|
get_function(FunctionName=...) | Checks if the function exists. If it does, we update it. If it raises ResourceNotFoundException, we create it |
| Why use try/except instead of listing functions? | More efficient — one direct call vs paginating through all functions |
self.lmb.update_function_code(
FunctionName=function_name,
ZipFile=zip_bytes,
Publish=False,
)
self._wait_for_update(function_name)
| Line | Explanation |
|---|
update_function_code(ZipFile=zip_bytes) | Uploads the new ZIP to Lambda. The function now runs the new code |
ZipFile=zip_bytes | Passes the raw bytes directly. For files > 50 MB, use S3Bucket/S3Key instead |
Publish=False | Does NOT create a new version yet. We update config next, then publish once after both changes. If we published here, we’d create a version with the new code but old config |
self._wait_for_update(function_name) | Lambda updates are asynchronous — you must wait for LastUpdateStatus == "Successful" before making another change. Without waiting, the next call raises ResourceConflictException |
self.lmb.update_function_configuration(
FunctionName=function_name,
Environment={"Variables": env_vars},
MemorySize=memory,
Timeout=timeout,
)
| Line | Explanation |
|---|
update_function_configuration(...) | Updates runtime settings separately from code. AWS requires two separate API calls — code and config cannot be updated atomically |
Environment={"Variables": env_vars} | Sets environment variables. env_vars is a plain dict like {"DB_HOST": "...", "LOG_LEVEL": "INFO"} |
MemorySize=memory | RAM allocated to the Lambda in MB. Also proportionally increases CPU. Valid: 128–10,240 MB in 1 MB increments |
Timeout=timeout | Max execution time in seconds. Lambda kills the function if it exceeds this. Max: 900 (15 min) |
publish_version()
version_resp = self.lmb.publish_version(
FunctionName=function_name,
Description=f"Deploy {code_hash}",
)
version = version_resp["Version"]
| Line | Explanation |
|---|
publish_version(FunctionName=...) | Takes a snapshot of the current code + config and assigns it an immutable version number (e.g., "5") |
Description=f"Deploy {code_hash}" | Human-readable label for this version. Shown in the Lambda console version list |
version_resp["Version"] | The version number as a string (e.g., "5"). Lambda versions always use strings, not integers |
| Why publish a version? | Without versioning, aliases always point to $LATEST (the current unpublished code). With versioning, you can roll back by pointing the alias to a previous version number |
create_or_update_alias(function_name, alias, version, canary_weight, previous_version)
config["RoutingConfig"] = {
"AdditionalVersionWeights": {previous_version: canary_weight}
}
| Line | Explanation |
|---|
RoutingConfig | Controls traffic splitting between two Lambda versions |
AdditionalVersionWeights: {"4": 0.9} | Sends 90% of traffic to version 4. The remaining 10% goes to the alias’s FunctionVersion (the new version 5). Weights must sum to less than 1.0 |
| Why canary? | If version 5 has a bug, only 10% of users are affected. You can monitor error rates and roll back by updating the alias to 100% on version 4 |
try:
self.lmb.get_alias(FunctionName=function_name, Name=alias)
self.lmb.update_alias(**config)
except self.lmb.exceptions.ResourceNotFoundException:
self.lmb.create_alias(**config)
| Line | Explanation |
|---|
get_alias(FunctionName=..., Name=alias) | Checks if this alias already exists |
update_alias(**config) | **config unpacks the dict as keyword arguments. Equivalent to writing each key explicitly |
create_alias(**config) | Creates the alias on first deploy |
| Idempotent pattern | The same code path handles both first-time and subsequent deploys — safe to run multiple times |
_wait_for_update(function_name) — Waiter
for _ in range(max_wait):
resp = self.lmb.get_function_configuration(FunctionName=function_name)
status = resp["LastUpdateStatus"]
if status == "Successful":
return
if status == "Failed":
raise RuntimeError(...)
time.sleep(1)
raise TimeoutError(...)
| Line | Explanation |
|---|
for _ in range(max_wait) | Polls up to max_wait times (default 60 times = 60 seconds). _ is convention for “don’t care about this variable” |
get_function_configuration() | Returns the current function config including LastUpdateStatus |
resp["LastUpdateStatus"] | One of: "Successful" (ready), "InProgress" (still updating), "Failed" (update crashed) |
return on "Successful" | Exits the loop — the function is ready for the next operation |
raise RuntimeError on "Failed" | Lambda failed to apply the update. The previous version is still running |
time.sleep(1) | Waits 1 second between polls. Lambda updates typically complete in 2–5 seconds |
raise TimeoutError after loop | If still updating after 60 seconds, something is wrong |