Sync Local Directory to S3 with KMS Encryption & Manifest
Python script to sync a local directory to S3, encrypt every file with a KMS key, skip unchanged files via MD5 check, and produce a JSON upload manifest.
Secure data pipeline — archive application data to S3 with mandatory at-rest encryption and an audit trail of every uploaded file.
Problem Statement
Your application generates data files that must be backed up to S3. AWS policy mandates all data be encrypted with a customer-managed KMS key. Running the script multiple times should skip files that haven’t changed (idempotent). Every sync run should produce an audit manifest listing what was uploaded, skipped, or failed.
Required IAM Permissions
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject", "s3:HeadObject"],
"Resource": "arn:aws:s3:::my-company-data/*"
},
{
"Effect": "Allow",
"Action": ["kms:GenerateDataKey", "kms:Decrypt"],
"Resource": "arn:aws:kms:us-east-1:123456789012:key/abc-123"
}
]
}
Complete Script
import boto3
import os
import hashlib
import json
from pathlib import Path
from datetime import datetime
from botocore.exceptions import ClientError
class S3Syncer:
def __init__(self, bucket: str, prefix: str, kms_key_id: str, region: str = "us-east-1"):
"""
boto3.client("s3") — low-level S3 client used for fine-grained control.
We choose client (not resource) here because we need ExtraArgs
for KMS encryption in upload_file().
kms_key_id can be:
- Key ID: "abc-123-..."
- Key ARN: "arn:aws:kms:us-east-1:123456789012:key/abc-123"
- Alias: "alias/my-data-key"
"""
self.s3 = boto3.client("s3", region_name=region)
self.bucket = bucket
self.prefix = prefix.rstrip("/")
self.kms_key_id = kms_key_id
self.manifest: list[dict] = []
# ── MD5 hash ──────────────────────────────────────────────────
def get_file_md5(self, filepath: Path) -> str:
"""
Compute the MD5 hash of a local file in 8 KB chunks.
Reading in chunks prevents loading multi-GB files into RAM.
iter(lambda: f.read(8192), b"") calls the lambda repeatedly
until it returns the sentinel value b"" (empty bytes = EOF).
"""
md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
md5.update(chunk)
return md5.hexdigest()
# ── Deduplication check ───────────────────────────────────────
def file_exists_in_s3(self, s3_key: str, local_md5: str) -> bool:
"""
head_object() fetches only the metadata of an S3 object (no body).
It's cheap (~1 ms) compared to downloading the file for comparison.
S3 ETag for non-multipart uploads equals the MD5 of the object.
For multipart uploads the ETag is different — this check works
for files < 5 GB (single-part) which covers most use cases.
ClientError with code "404" means the object doesn't exist yet.
Any other error (403 Forbidden, etc.) is re-raised.
"""
try:
response = self.s3.head_object(Bucket=self.bucket, Key=s3_key)
s3_etag = response.get("ETag", "").strip('"')
return s3_etag == local_md5
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
raise
# ── Single file upload ────────────────────────────────────────
def upload_file(self, local_path: Path, s3_key: str) -> str:
"""
upload_file() streams the file from disk to S3 using multipart
upload automatically for files > 8 MB. ExtraArgs are passed
through to the underlying PutObject / CreateMultipartUpload API.
ServerSideEncryption: "aws:kms" instructs S3 to encrypt with KMS.
SSEKMSKeyId: which KMS key to use (defaults to the S3
service key if omitted).
Metadata: arbitrary key-value pairs stored alongside
the object — useful for integrity checks
without downloading the file.
"""
local_md5 = self.get_file_md5(local_path)
if self.file_exists_in_s3(s3_key, local_md5):
print(f" [SKIP] {s3_key} (unchanged)")
return "skipped"
self.s3.upload_file(
Filename=str(local_path),
Bucket=self.bucket,
Key=s3_key,
ExtraArgs={
"ServerSideEncryption": "aws:kms",
"SSEKMSKeyId": self.kms_key_id,
"Metadata": {"md5checksum": local_md5},
},
)
print(f" [UPLOAD] {s3_key} ({local_path.stat().st_size:,} bytes)")
return "uploaded"
# ── Directory sync ────────────────────────────────────────────
def sync_directory(self, local_dir: str) -> dict:
"""
Path.rglob("*") recursively yields all files and directories.
is_file() filters out directories.
relative_to() strips the local base path so we get just the
relative path for the S3 key.
.replace("\\\\", "/") normalises Windows backslashes to forward
slashes for S3 keys.
"""
local_dir_path = Path(local_dir)
stats = {"uploaded": 0, "skipped": 0, "failed": 0}
for file_path in local_dir_path.rglob("*"):
if not file_path.is_file():
continue
relative = file_path.relative_to(local_dir_path)
s3_key = f"{self.prefix}/{relative}".replace("\\", "/")
try:
result = self.upload_file(file_path, s3_key)
stats[result] += 1
self.manifest.append({
"local_path": str(file_path),
"s3_key": s3_key,
"s3_uri": f"s3://{self.bucket}/{s3_key}",
"size_bytes": file_path.stat().st_size,
"status": result,
"timestamp": datetime.utcnow().isoformat() + "Z",
})
except Exception as e:
print(f" [ERROR] {file_path}: {e}")
stats["failed"] += 1
self.manifest.append({
"local_path": str(file_path),
"s3_key": s3_key,
"status": "failed",
"error": str(e),
"timestamp": datetime.utcnow().isoformat() + "Z",
})
return stats
# ── Save manifest ─────────────────────────────────────────────
def save_manifest(self, local_output: str = "manifest.json") -> None:
"""
Write the manifest locally and also upload it to S3
so there's a persistent audit trail of every sync run.
The timestamp in the key prevents overwriting previous manifests.
"""
with open(local_output, "w") as f:
json.dump(self.manifest, f, indent=2)
manifest_key = (
f"{self.prefix}/_manifests/"
f"manifest_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
)
self.s3.upload_file(
Filename=local_output,
Bucket=self.bucket,
Key=manifest_key,
ExtraArgs={
"ServerSideEncryption": "aws:kms",
"SSEKMSKeyId": self.kms_key_id,
},
)
print(f"\nManifest uploaded → s3://{self.bucket}/{manifest_key}")
# ── Entry point ───────────────────────────────────────────────────
if __name__ == "__main__":
syncer = S3Syncer(
bucket="my-company-data",
prefix="backups/app-data",
kms_key_id="arn:aws:kms:us-east-1:123456789012:key/abc-123",
region="us-east-1",
)
stats = syncer.sync_directory("/opt/app/data")
syncer.save_manifest()
print(f"\nSync complete:")
print(f" Uploaded: {stats['uploaded']}")
print(f" Skipped: {stats['skipped']} (unchanged)")
print(f" Failed: {stats['failed']}")
Sample Manifest Output
[
{
"local_path": "/opt/app/data/report-2025-01.csv",
"s3_key": "backups/app-data/report-2025-01.csv",
"s3_uri": "s3://my-company-data/backups/app-data/report-2025-01.csv",
"size_bytes": 204800,
"status": "uploaded",
"timestamp": "2025-01-20T10:30:00Z"
},
{
"local_path": "/opt/app/data/config.json",
"s3_key": "backups/app-data/config.json",
"s3_uri": "s3://my-company-data/backups/app-data/config.json",
"size_bytes": 1024,
"status": "skipped",
"timestamp": "2025-01-20T10:30:01Z"
}
]
Key Commands Explained
| Command | What it does |
|---|---|
hashlib.md5() | Creates an MD5 hash object for content fingerprinting |
iter(lambda: f.read(8192), b"") | Reads file in 8 KB chunks until EOF — memory efficient |
head_object(Bucket, Key) | Fetches S3 object metadata without downloading the body |
response["ETag"].strip('"') | ETag comes with surrounding quotes — strip them for comparison |
upload_file(Filename, Bucket, Key, ExtraArgs={...}) | Streams file to S3; ExtraArgs passes encryption settings |
ServerSideEncryption: "aws:kms" | Tells S3 to use KMS for at-rest encryption |
SSEKMSKeyId | Specifies the KMS key ARN, ID, or alias to use |
Path.rglob("*") | Recursively yields every file and directory under the path |
Common Issues
AccessDenied on KMS — The IAM role must have kms:GenerateDataKey on the specific key ARN. Check the KMS key policy too — it must allow the role.
ETag mismatch for large files — Files uploaded via multipart (> 8 MB default) get a composite ETag (md5-N), not a plain MD5. For those, compare using Metadata["md5checksum"] instead.
Slow syncs — Use concurrent.futures.ThreadPoolExecutor to upload multiple files in parallel. boto3’s upload_file is thread-safe.
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|---|
import boto3 | AWS SDK for Python — needed for S3 client calls |
import os | Used for file path manipulation (os.path, etc.) |
import hashlib | Python standard library for cryptographic hashes. We use hashlib.md5() to compute file fingerprints for deduplication |
import json | Standard library for serializing the manifest dict to a JSON file |
from pathlib import Path | Modern Python path handling. Path.rglob("*") recursively finds all files. More readable than os.walk() |
from datetime import datetime | Used to timestamp the manifest file name and each manifest entry |
from botocore.exceptions import ClientError | AWS SDK error class. We catch it to distinguish “object not found (404)” from real errors |
S3Syncer.__init__
def __init__(self, bucket: str, prefix: str, kms_key_id: str, region: str = "us-east-1"):
self.s3 = boto3.client("s3", region_name=region)
self.bucket = bucket
self.prefix = prefix.rstrip("/")
self.kms_key_id = kms_key_id
self.manifest: list[dict] = []
| Line | Explanation |
|---|---|
boto3.client("s3", region_name=region) | Creates a low-level S3 client. We use client (not resource) because upload_file() on the client accepts ExtraArgs for KMS settings |
self.bucket | The S3 bucket name. Stored once — used in every API call |
prefix.rstrip("/") | Removes trailing slashes from the S3 key prefix. Prevents double-slash keys like backups//file.txt |
self.kms_key_id | The KMS key identifier (ID, ARN, or alias). Passed to every upload_file() call |
self.manifest: list[dict] = [] | Accumulates one record per file processed. Written to JSON at the end of the sync run |
get_file_md5(filepath)
md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
md5.update(chunk)
return md5.hexdigest()
| Line | Explanation |
|---|---|
hashlib.md5() | Creates an MD5 hash object. MD5 produces a 32-character hex string that uniquely identifies file content |
open(filepath, "rb") | Opens the file in binary read mode. Required for accurate MD5 — text mode can alter line endings on Windows |
iter(lambda: f.read(8192), b"") | This is a sentinel-based iterator. iter(callable, sentinel) calls the callable repeatedly until it returns the sentinel value. f.read(8192) reads 8 KB at a time. When the file ends, read() returns b"" (empty bytes) which is the sentinel — the loop stops |
md5.update(chunk) | Feeds each 8 KB chunk into the MD5 computation. Chunk-based hashing means the full file is never loaded into RAM |
md5.hexdigest() | Returns the final MD5 as a 32-character hex string like "d41d8cd98f00b204e9800998ecf8427e" |
file_exists_in_s3(s3_key, local_md5)
response = self.s3.head_object(Bucket=self.bucket, Key=s3_key)
s3_etag = response.get("ETag", "").strip('"')
return s3_etag == local_md5
| Line | Explanation |
|---|---|
head_object(Bucket=..., Key=...) | Fetches only the metadata of an S3 object — not its content. This is a cheap HTTP HEAD request (~1 ms, zero data transfer cost). Used to check if the object exists and get its ETag |
response.get("ETag", "") | ETag is the entity tag (fingerprint) of the S3 object. For single-part uploads (files < 5 GB), the ETag equals the MD5 of the object |
.strip('"') | AWS returns ETags wrapped in double-quotes: '"d41d8cd9..."'. We strip the quotes before comparing |
return s3_etag == local_md5 | If the ETag matches our local MD5, the file is identical — skip the upload |
except ClientError as e: | head_object raises ClientError for any error |
e.response["Error"]["Code"] == "404" | HTTP 404 means the object doesn’t exist yet — return False to trigger an upload. Any other error (403 Forbidden, etc.) is re-raised |
upload_file(local_path, s3_key)
self.s3.upload_file(
Filename=str(local_path),
Bucket=self.bucket,
Key=s3_key,
ExtraArgs={
"ServerSideEncryption": "aws:kms",
"SSEKMSKeyId": self.kms_key_id,
"Metadata": {"md5checksum": local_md5},
},
)
| Line | Explanation |
|---|---|
upload_file(Filename=..., Bucket=..., Key=...) | boto3’s managed upload — automatically uses multipart upload for files > 8 MB, retries on failure, and tracks upload progress |
Filename=str(local_path) | upload_file expects a string path (not a Path object), so we convert with str() |
Key=s3_key | The full S3 object key (path within the bucket), e.g., backups/app-data/report-2025-01.csv |
ExtraArgs={"ServerSideEncryption": "aws:kms"} | Passes KMS encryption parameters to the underlying PutObject API call. Without this, the object is uploaded unencrypted |
"ServerSideEncryption": "aws:kms" | Tells S3 to encrypt this object using a KMS key. Alternatives: "AES256" (S3-managed key) or omit for no encryption |
"SSEKMSKeyId": self.kms_key_id | Specifies WHICH KMS key to use. Can be key ID, full ARN, or alias (alias/my-key). If omitted, uses the S3 default KMS key |
"Metadata": {"md5checksum": local_md5} | Custom metadata stored alongside the object. Can later be retrieved with head_object() to verify integrity without downloading the file |
sync_directory(local_dir)
local_dir_path = Path(local_dir)
for file_path in local_dir_path.rglob("*"):
if not file_path.is_file():
continue
relative = file_path.relative_to(local_dir_path)
s3_key = f"{self.prefix}/{relative}".replace("\\", "/")
| Line | Explanation |
|---|---|
Path(local_dir) | Converts the string path to a pathlib.Path object. Enables cross-platform path manipulation |
local_dir_path.rglob("*") | Recursively yields every file and directory under local_dir. rglob is shorthand for glob("**/*") |
file_path.is_file() | Filters out directories — we only want actual files |
file_path.relative_to(local_dir_path) | Strips the base directory prefix. If the file is /opt/app/data/reports/q1.csv and the base is /opt/app/data, this returns reports/q1.csv |
.replace("\\", "/") | On Windows, Path uses backslashes. S3 keys must use forward slashes. This normalizes the separator |
stats = {"uploaded": 0, "skipped": 0, "failed": 0} | Tracks counts across all files. Updated after each upload_file() call |
save_manifest()
with open(local_output, "w") as f:
json.dump(self.manifest, f, indent=2)
manifest_key = (
f"{self.prefix}/_manifests/manifest_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
)
self.s3.upload_file(Filename=local_output, Bucket=self.bucket, Key=manifest_key, ...)
| Line | Explanation |
|---|---|
json.dump(self.manifest, f, indent=2) | Writes the manifest list to a local JSON file. indent=2 makes it human-readable with 2-space indentation |
datetime.utcnow().strftime('%Y%m%d_%H%M%S') | Timestamp formatted as 20250120_103000. Used in the key name so each manifest run has a unique key — old manifests are never overwritten |
manifest_key | The S3 key for the manifest file. Stored inside a _manifests/ prefix so it stays separate from data files |
Second upload_file(...) | Uploads the manifest to S3 with the same KMS encryption. This creates a persistent, searchable audit trail of every sync run |
- S3 upload with ExtraArgs
- KMS server-side encryption
- MD5 content hashing for deduplication
- upload_file vs put_object
- Recursive directory traversal with pathlib
Have a similar scenario to share?
Production incidents are the best teachers. Submit your real-world scenario and help others learn.
Open Google FormRelated Scenarios
S3 Bucket Security Audit — Public Access, Versioning & Encryption
Problem Statement A misconfigured S3 bucket was the #1 cause of cloud data breaches in 2023. Common mistakes: public access not blocked, no …
Auto Stop/Start EC2 Instances Using Schedule Tags with Python
Problem Statement Your team has 20 dev/staging EC2 instances that run 24/7 but are only used during business hours (8 AM – 8 PM). Each …
Clean Up Unused AWS Resources — EBS Volumes, EIPs, Old AMIs with Cost Report
Resource Cost Overview Resource Approx. Cost When it wastes money EBS gp3 volume $0.08/GB/month When not attached to any instance Elastic IP …