Python Scenario-Based Interview Questions
Real-world Python scenario questions covering debugging, performance, API design, asyncio concurrency, memory optimization, and production-grade patterns.
Senior Python developer / backend engineering interviews
Scenario 1 — Memory Leak in a Long-Running Python Service
Context: You deployed a Python microservice that processes incoming webhook events. After a few hours of running in production, memory usage has grown from 80 MB to over 2 GB and the service becomes unresponsive.
Question: How would you diagnose and fix the memory leak?
Investigation steps:
# Step 1 — Profile memory with tracemalloc
import tracemalloc
tracemalloc.start()
# ... run the service for a while ...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
# Output reveals: webhook_handler.py:45 grew by 1.2 GB — the 'seen_ids' set
# The bug — unbounded set accumulation
seen_ids = set() # Module-level — never cleared!
def process_webhook(event_id: str, payload: dict):
if event_id in seen_ids:
return # Deduplicate
seen_ids.add(event_id) # Grows forever!
handle_payload(payload)
# Fix 1 — Use Redis with TTL for deduplication (production-grade)
import redis
r = redis.Redis()
def process_webhook(event_id: str, payload: dict):
key = f"webhook:seen:{event_id}"
if r.exists(key):
return
r.setex(key, time=86400, value=1) # TTL: 24 hours
handle_payload(payload)
# Fix 2 — Use a bounded LRU cache (in-memory, simple)
from functools import lru_cache
from collections import OrderedDict
class BoundedSet:
def __init__(self, maxsize=100_000):
self._data = OrderedDict()
self._maxsize = maxsize
def add(self, key):
if len(self._data) >= self._maxsize:
self._data.popitem(last=False) # Evict oldest
self._data[key] = True
def __contains__(self, key):
return key in self._data
seen_ids = BoundedSet(maxsize=100_000)
Key takeaway: Never use unbounded module-level containers in long-running services. Use Redis with TTL for distributed deduplication, or a bounded in-memory structure with eviction.
Scenario 2 — Slow API Endpoint: N+1 Query Problem
Context: A Django REST API endpoint that lists all users and their order counts is responding in 8 seconds for 500 users. The database has proper indexes. How would you fix it?
# SLOW — N+1 queries (1 query for users + 1 query per user for orders)
class UserListView(APIView):
def get(self, request):
users = User.objects.all() # Query 1: SELECT * FROM users
result = []
for user in users:
result.append({
"id": user.id,
"name": user.name,
"order_count": user.orders.count() # Query 2...501: SELECT COUNT(*)
})
return Response(result)
# FAST — Single query with annotation
from django.db.models import Count
class UserListView(APIView):
def get(self, request):
users = User.objects.annotate(
order_count=Count('orders') # Single JOIN — one query total
).values('id', 'name', 'order_count')
return Response(list(users))
# Alternatively, prefetch_related for complex nested data:
users = User.objects.prefetch_related('orders').all()
for user in users:
# orders already loaded in 2 queries total, not N+1
count = len(user.orders.all())
Debugging tool:
# Django Debug Toolbar or django-silk shows query count
from django.db import connection
def get_query_count():
return len(connection.queries)
# Or log all queries during development
import logging
logging.getLogger('django.db.backends').setLevel(logging.DEBUG)
Scenario 3 — Race Condition in an Async Python Service
Context: You have a FastAPI service where two concurrent requests can both check if a username is available and both register the same username, creating duplicate accounts. How do you fix this?
# BROKEN — race condition between check and insert
@app.post("/register")
async def register(username: str, db: AsyncSession = Depends(get_db)):
# Request A and B both reach here simultaneously
existing = await db.execute(select(User).where(User.username == username))
if existing.scalar():
raise HTTPException(400, "Username taken")
# Both requests pass the check above and both insert!
user = User(username=username)
db.add(user)
await db.commit() # One succeeds, one fails with IntegrityError
# FIX 1 — Database-level unique constraint + handle IntegrityError
# migrations: unique=True on username column
@app.post("/register")
async def register(username: str, db: AsyncSession = Depends(get_db)):
try:
user = User(username=username)
db.add(user)
await db.commit()
return {"id": user.id}
except IntegrityError:
await db.rollback()
raise HTTPException(400, "Username already taken")
# FIX 2 — Distributed lock with Redis (for more complex operations)
import redis.asyncio as aioredis
redis = aioredis.from_url("redis://localhost")
@app.post("/register")
async def register(username: str, db: AsyncSession = Depends(get_db)):
lock_key = f"lock:register:{username}"
async with redis.lock(lock_key, timeout=5):
existing = await db.execute(select(User).where(User.username == username))
if existing.scalar():
raise HTTPException(400, "Username taken")
user = User(username=username)
db.add(user)
await db.commit()
Best practice: Always rely on database-level constraints as the last line of defence against race conditions. Locks are for coordinating complex multi-step operations.
Scenario 4 — CPU-Bound Task Blocking Async Event Loop
Context: Your FastAPI service processes image thumbnails on upload. When a user uploads a large image, the API becomes unresponsive for all other users for ~3 seconds. Why and how do you fix it?
# BROKEN — CPU-bound work on the async event loop blocks everything
from PIL import Image
@app.post("/upload")
async def upload_image(file: UploadFile):
data = await file.read()
img = Image.open(io.BytesIO(data))
thumbnail = img.resize((200, 200)) # CPU-bound — blocks event loop!
# During this 3 seconds, no other requests are served
return save_thumbnail(thumbnail)
# FIX — run CPU-bound work in a thread pool executor
import asyncio
from concurrent.futures import ProcessPoolExecutor
from PIL import Image
process_pool = ProcessPoolExecutor(max_workers=4)
def generate_thumbnail(data: bytes) -> bytes:
"""Pure function — safe to run in subprocess."""
img = Image.open(io.BytesIO(data))
thumbnail = img.resize((200, 200), Image.LANCZOS)
buf = io.BytesIO()
thumbnail.save(buf, format="JPEG", quality=85)
return buf.getvalue()
@app.post("/upload")
async def upload_image(file: UploadFile):
data = await file.read()
loop = asyncio.get_event_loop()
# Run in process pool — event loop stays free
thumbnail_data = await loop.run_in_executor(
process_pool, generate_thumbnail, data
)
return store_thumbnail(thumbnail_data)
# For truly heavy workloads — use a task queue (Celery + Redis)
from celery import Celery
celery = Celery("tasks", broker="redis://localhost/0")
@celery.task
def process_image_task(image_data: bytes):
return generate_thumbnail(image_data)
@app.post("/upload")
async def upload_image(file: UploadFile):
data = await file.read()
task = process_image_task.delay(data)
return {"task_id": task.id, "status": "processing"}
@app.get("/upload/{task_id}")
async def get_result(task_id: str):
task = AsyncResult(task_id)
return {"status": task.status, "result": task.result}
Scenario 5 — Designing a Rate Limiter in Python
Context: Your public API is getting hammered. You need to implement a rate limiter that allows 100 requests per minute per API key, returning HTTP 429 when exceeded.
# Token bucket rate limiter using Redis
import time
import redis.asyncio as aioredis
from fastapi import Request, HTTPException
redis = aioredis.from_url("redis://localhost")
async def check_rate_limit(api_key: str, limit: int = 100, window: int = 60):
"""Sliding window rate limiter using Redis sorted sets."""
key = f"rate:{api_key}"
now = time.time()
window_start = now - window
pipe = redis.pipeline()
# Remove old requests outside the window
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window)
_, count, _, _ = await pipe.execute()
if count >= limit:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": str(window), "X-RateLimit-Limit": str(limit)}
)
return {"remaining": limit - count - 1}
# FastAPI middleware
from fastapi import FastAPI
from fastapi.middleware.base import BaseHTTPMiddleware
class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
api_key = request.headers.get("X-API-Key", request.client.host)
await check_rate_limit(api_key)
return await call_next(request)
app = FastAPI()
app.add_middleware(RateLimitMiddleware)
Scenario 6 — Debugging a Circular Import Error
Context: Your Python application raises ImportError: cannot import name 'UserService' from partially initialized module 'app.services' at startup. How do you diagnose and fix it?
# The circular import:
# app/models.py imports from app/services.py
# app/services.py imports from app/models.py
# models.py
from app.services import UserService # Imports services
class User:
...
# services.py
from app.models import User # Imports models → circular!
class UserService:
def create(self, name): return User(name=name)
# Fix 1 — Restructure: move shared types to a separate module
# app/types.py (no imports from models or services)
from dataclasses import dataclass
@dataclass
class UserDTO:
name: str
email: str
# models.py — imports from types only
from app.types import UserDTO
# services.py — imports from types only
from app.types import UserDTO
# Fix 2 — Lazy import (defer import to function level)
# services.py
class UserService:
def create(self, name: str):
from app.models import User # Import at function call time, not module load
return User(name=name)
# Fix 3 — Use TYPE_CHECKING guard (for type hints only)
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app.models import User # Only used for type checking, not at runtime
Scenario 7 — Optimising a Slow Data Processing Pipeline
Context: A batch job that processes 10 million CSV rows runs for 4 hours. Each row is parsed, validated, transformed, and inserted into a database. How would you reduce the run time to under 30 minutes?
# Current slow approach
import csv
import psycopg2
conn = psycopg2.connect(...)
cur = conn.cursor()
with open("data.csv") as f:
reader = csv.DictReader(f)
for row in reader: # 10M iterations
validated = validate(row)
transformed = transform(validated)
cur.execute( # 10M individual INSERT statements!
"INSERT INTO records VALUES (%s, %s, %s)",
(transformed["id"], transformed["name"], transformed["value"])
)
conn.commit()
# Optimised approach — chunked bulk insert + multiprocessing
import csv
import psycopg2
from psycopg2.extras import execute_values
from concurrent.futures import ProcessPoolExecutor
import itertools
def process_chunk(rows: list[dict]) -> list[tuple]:
"""Validate and transform a chunk of rows — runs in subprocess."""
result = []
for row in rows:
try:
validated = validate(row)
transformed = transform(validated)
result.append((transformed["id"], transformed["name"], transformed["value"]))
except ValueError:
pass # Skip invalid rows
return result
def chunked(iterable, size):
it = iter(iterable)
while chunk := list(itertools.islice(it, size)):
yield chunk
CHUNK_SIZE = 10_000
with open("data.csv") as f:
reader = csv.DictReader(f)
with ProcessPoolExecutor(max_workers=8) as pool:
conn = psycopg2.connect(...)
cur = conn.cursor()
for processed_chunk in pool.map(process_chunk, chunked(reader, CHUNK_SIZE)):
execute_values(
cur,
"INSERT INTO records (id, name, value) VALUES %s ON CONFLICT DO NOTHING",
processed_chunk,
page_size=1000
)
conn.commit()
# Results:
# Before: 4 hours (10M individual inserts, single thread)
# After: ~18 minutes (bulk insert + 8 parallel processes)
Scenario 8 — Python Service Crashing with OOM in Kubernetes
Context: Your Python service deployed in Kubernetes (256 MB memory limit) is being OOM-killed every few hours. The service reads and processes JSON payloads from a message queue.
Diagnosis approach:
# Step 1 — Add memory tracking
import tracemalloc
import resource
import logging
def log_memory():
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
logging.info(f"Memory: {usage / 1024:.1f} MB")
# Step 2 — Profile what's growing
tracemalloc.start()
snapshot1 = tracemalloc.take_snapshot()
# ... process messages ...
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
for stat in top_stats[:5]:
print(stat)
# Common cause: loading entire JSON payload into memory
import json
# SLOW — loads entire file into memory
def process_large_json(path: str):
with open(path) as f:
data = json.load(f) # Entire 500MB JSON in RAM!
for record in data["records"]:
process(record)
# FIX — use ijson for streaming JSON parsing
import ijson
def process_large_json(path: str):
with open(path, "rb") as f:
for record in ijson.items(f, "records.item"):
process(record) # One record at a time — minimal memory
# FIX 2 — for message queues, process and acknowledge immediately
async def consume_messages(queue):
async for message in queue:
await process_single_message(message.body)
await message.ack() # Don't batch large amounts in memory
del message # Help GC reclaim memory immediately
Scenario 9 — Writing Thread-Safe Python Code
Context: A Python service uses a shared in-memory cache (dict) that is accessed and updated by multiple threads. You’re seeing occasional KeyError and corrupted data. How do you fix it?
# BROKEN — dict is not thread-safe for concurrent reads/writes
import threading
cache = {} # Shared across threads
def get_or_compute(key: str):
if key not in cache: # Thread A reads: not in cache
value = expensive_compute(key) # Both threads compute!
cache[key] = value # Race condition: both write
return cache[key]
# Two threads can both see 'key not in cache' and both compute + write
# FIX — threading.Lock for mutual exclusion
import threading
cache = {}
lock = threading.Lock()
def get_or_compute(key: str):
with lock:
if key in cache:
return cache[key]
value = expensive_compute(key) # Only one thread computes
cache[key] = value
return value
# FIX 2 — Use threading.RLock for reentrant code (same thread can acquire multiple times)
rlock = threading.RLock()
# FIX 3 — Use concurrent.futures or thread-safe collections
from queue import Queue
task_queue = Queue() # Thread-safe FIFO
# FIX 4 — cachetools for thread-safe LRU cache
from cachetools import TTLCache
from cachetools.keys import hashkey
import threading
cache = TTLCache(maxsize=1000, ttl=300) # LRU + TTL
cache_lock = threading.Lock()
def get_or_compute(key: str):
with cache_lock:
if key not in cache:
cache[key] = expensive_compute(key)
return cache[key]
Scenario 10 — Implementing Retry Logic with Exponential Backoff
Context: Your Python service calls an external payment API that occasionally returns HTTP 429 (rate limited) or 503 (service unavailable). How do you implement robust retry logic?
import time
import random
import functools
import logging
from typing import Type
logger = logging.getLogger(__name__)
def retry_with_backoff(
retryable_exceptions: tuple[Type[Exception], ...],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
):
"""Decorator for exponential backoff with jitter."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
if attempt == max_retries:
logger.error(f"{func.__name__} failed after {max_retries} attempts: {e}")
raise
actual_delay = min(delay, max_delay)
if jitter:
# Add ±25% jitter to prevent thundering herd
actual_delay *= (0.75 + random.random() * 0.5)
logger.warning(
f"{func.__name__} attempt {attempt}/{max_retries} failed: {e}. "
f"Retrying in {actual_delay:.1f}s"
)
time.sleep(actual_delay)
delay *= 2 # Exponential backoff
return wrapper
return decorator
# Usage
import requests
class PaymentAPIError(Exception): pass
class RateLimitError(PaymentAPIError): pass
@retry_with_backoff(
retryable_exceptions=(RateLimitError, requests.Timeout, requests.ConnectionError),
max_retries=5,
base_delay=1.0,
)
def charge_payment(amount: float, card_token: str) -> dict:
response = requests.post(
"https://api.payment.com/charge",
json={"amount": amount, "token": card_token},
timeout=10,
)
if response.status_code == 429:
raise RateLimitError("Payment API rate limited")
if response.status_code >= 500:
response.raise_for_status()
return response.json()
# Async version with tenacity library (production-grade)
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
wait_jitter,
retry_if_exception_type,
)
@retry(
retry=retry_if_exception_type((RateLimitError, aiohttp.ClientError)),
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60) + wait_jitter(max=2),
)
async def charge_payment_async(amount: float, card_token: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.payment.com/charge",
json={"amount": amount, "token": card_token},
) as resp:
if resp.status == 429:
raise RateLimitError("Rate limited")
resp.raise_for_status()
return await resp.json()
- Concurrency patterns
- Performance optimization
- Production debugging
- API design best practices
Have a similar scenario to share?
Production incidents are the best teachers. Submit your real-world scenario and help others learn.
Open Google FormRelated Scenarios
Application Latency Spiked After Migrating EC2 to ECS Fargate
The Problem Latency regressions after migrating to Fargate are almost always caused by networking changes — not application code. Fargate …
Auto Stop/Start EC2 Instances Using Schedule Tags with Python
Problem Statement Your team has 20 dev/staging EC2 instances that run 24/7 but are only used during business hours (8 AM – 8 PM). Each …
Clean Up Unused AWS Resources — EBS Volumes, EIPs, Old AMIs with Cost Report
Resource Cost Overview Resource Approx. Cost When it wastes money EBS gp3 volume $0.08/GB/month When not attached to any instance Elastic IP …