Scenario Advanced Python Python Scenario Prep

Python Scenario-Based Interview Questions

Real-world Python scenario questions covering debugging, performance, API design, asyncio concurrency, memory optimization, and production-grade patterns.

January 20, 2025 11 min read ~30 min to complete DB
The Situation

Senior Python developer / backend engineering interviews

10 Steps
5 Services Used
~30 min Duration
Advanced Difficulty

Scenario 1 — Memory Leak in a Long-Running Python Service

Context: You deployed a Python microservice that processes incoming webhook events. After a few hours of running in production, memory usage has grown from 80 MB to over 2 GB and the service becomes unresponsive.

Question: How would you diagnose and fix the memory leak?

Your webhook service stores processed event IDs in a module-level set to prevent duplicate processing. The service receives ~10,000 events/hour and has been running for 72 hours.

Investigation steps:

# Step 1 — Profile memory with tracemalloc
import tracemalloc

tracemalloc.start()

# ... run the service for a while ...

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

for stat in top_stats[:10]:
    print(stat)
# Output reveals: webhook_handler.py:45 grew by 1.2 GB — the 'seen_ids' set
# The bug — unbounded set accumulation
seen_ids = set()   # Module-level — never cleared!

def process_webhook(event_id: str, payload: dict):
    if event_id in seen_ids:
        return   # Deduplicate
    seen_ids.add(event_id)   # Grows forever!
    handle_payload(payload)
# Fix 1 — Use Redis with TTL for deduplication (production-grade)
import redis

r = redis.Redis()

def process_webhook(event_id: str, payload: dict):
    key = f"webhook:seen:{event_id}"
    if r.exists(key):
        return
    r.setex(key, time=86400, value=1)  # TTL: 24 hours
    handle_payload(payload)

# Fix 2 — Use a bounded LRU cache (in-memory, simple)
from functools import lru_cache
from collections import OrderedDict

class BoundedSet:
    def __init__(self, maxsize=100_000):
        self._data = OrderedDict()
        self._maxsize = maxsize

    def add(self, key):
        if len(self._data) >= self._maxsize:
            self._data.popitem(last=False)  # Evict oldest
        self._data[key] = True

    def __contains__(self, key):
        return key in self._data

seen_ids = BoundedSet(maxsize=100_000)

Key takeaway: Never use unbounded module-level containers in long-running services. Use Redis with TTL for distributed deduplication, or a bounded in-memory structure with eviction.


Scenario 2 — Slow API Endpoint: N+1 Query Problem

Context: A Django REST API endpoint that lists all users and their order counts is responding in 8 seconds for 500 users. The database has proper indexes. How would you fix it?

# SLOW — N+1 queries (1 query for users + 1 query per user for orders)
class UserListView(APIView):
    def get(self, request):
        users = User.objects.all()   # Query 1: SELECT * FROM users
        result = []
        for user in users:
            result.append({
                "id": user.id,
                "name": user.name,
                "order_count": user.orders.count()  # Query 2...501: SELECT COUNT(*)
            })
        return Response(result)
# FAST — Single query with annotation
from django.db.models import Count

class UserListView(APIView):
    def get(self, request):
        users = User.objects.annotate(
            order_count=Count('orders')  # Single JOIN — one query total
        ).values('id', 'name', 'order_count')

        return Response(list(users))

# Alternatively, prefetch_related for complex nested data:
users = User.objects.prefetch_related('orders').all()
for user in users:
    # orders already loaded in 2 queries total, not N+1
    count = len(user.orders.all())

Debugging tool:

# Django Debug Toolbar or django-silk shows query count
from django.db import connection

def get_query_count():
    return len(connection.queries)

# Or log all queries during development
import logging
logging.getLogger('django.db.backends').setLevel(logging.DEBUG)

Scenario 3 — Race Condition in an Async Python Service

Context: You have a FastAPI service where two concurrent requests can both check if a username is available and both register the same username, creating duplicate accounts. How do you fix this?

# BROKEN — race condition between check and insert
@app.post("/register")
async def register(username: str, db: AsyncSession = Depends(get_db)):
    # Request A and B both reach here simultaneously
    existing = await db.execute(select(User).where(User.username == username))
    if existing.scalar():
        raise HTTPException(400, "Username taken")

    # Both requests pass the check above and both insert!
    user = User(username=username)
    db.add(user)
    await db.commit()   # One succeeds, one fails with IntegrityError
# FIX 1 — Database-level unique constraint + handle IntegrityError
# migrations: unique=True on username column

@app.post("/register")
async def register(username: str, db: AsyncSession = Depends(get_db)):
    try:
        user = User(username=username)
        db.add(user)
        await db.commit()
        return {"id": user.id}
    except IntegrityError:
        await db.rollback()
        raise HTTPException(400, "Username already taken")

# FIX 2 — Distributed lock with Redis (for more complex operations)
import redis.asyncio as aioredis

redis = aioredis.from_url("redis://localhost")

@app.post("/register")
async def register(username: str, db: AsyncSession = Depends(get_db)):
    lock_key = f"lock:register:{username}"
    async with redis.lock(lock_key, timeout=5):
        existing = await db.execute(select(User).where(User.username == username))
        if existing.scalar():
            raise HTTPException(400, "Username taken")
        user = User(username=username)
        db.add(user)
        await db.commit()

Best practice: Always rely on database-level constraints as the last line of defence against race conditions. Locks are for coordinating complex multi-step operations.


Scenario 4 — CPU-Bound Task Blocking Async Event Loop

Context: Your FastAPI service processes image thumbnails on upload. When a user uploads a large image, the API becomes unresponsive for all other users for ~3 seconds. Why and how do you fix it?

# BROKEN — CPU-bound work on the async event loop blocks everything
from PIL import Image

@app.post("/upload")
async def upload_image(file: UploadFile):
    data = await file.read()
    img = Image.open(io.BytesIO(data))
    thumbnail = img.resize((200, 200))   # CPU-bound — blocks event loop!
    # During this 3 seconds, no other requests are served
    return save_thumbnail(thumbnail)
# FIX — run CPU-bound work in a thread pool executor
import asyncio
from concurrent.futures import ProcessPoolExecutor
from PIL import Image

process_pool = ProcessPoolExecutor(max_workers=4)

def generate_thumbnail(data: bytes) -> bytes:
    """Pure function — safe to run in subprocess."""
    img = Image.open(io.BytesIO(data))
    thumbnail = img.resize((200, 200), Image.LANCZOS)
    buf = io.BytesIO()
    thumbnail.save(buf, format="JPEG", quality=85)
    return buf.getvalue()

@app.post("/upload")
async def upload_image(file: UploadFile):
    data = await file.read()
    loop = asyncio.get_event_loop()
    # Run in process pool — event loop stays free
    thumbnail_data = await loop.run_in_executor(
        process_pool, generate_thumbnail, data
    )
    return store_thumbnail(thumbnail_data)
# For truly heavy workloads — use a task queue (Celery + Redis)
from celery import Celery

celery = Celery("tasks", broker="redis://localhost/0")

@celery.task
def process_image_task(image_data: bytes):
    return generate_thumbnail(image_data)

@app.post("/upload")
async def upload_image(file: UploadFile):
    data = await file.read()
    task = process_image_task.delay(data)
    return {"task_id": task.id, "status": "processing"}

@app.get("/upload/{task_id}")
async def get_result(task_id: str):
    task = AsyncResult(task_id)
    return {"status": task.status, "result": task.result}

Scenario 5 — Designing a Rate Limiter in Python

Context: Your public API is getting hammered. You need to implement a rate limiter that allows 100 requests per minute per API key, returning HTTP 429 when exceeded.

# Token bucket rate limiter using Redis
import time
import redis.asyncio as aioredis
from fastapi import Request, HTTPException

redis = aioredis.from_url("redis://localhost")

async def check_rate_limit(api_key: str, limit: int = 100, window: int = 60):
    """Sliding window rate limiter using Redis sorted sets."""
    key = f"rate:{api_key}"
    now = time.time()
    window_start = now - window

    pipe = redis.pipeline()
    # Remove old requests outside the window
    pipe.zremrangebyscore(key, 0, window_start)
    # Count requests in window
    pipe.zcard(key)
    # Add current request
    pipe.zadd(key, {str(now): now})
    # Set expiry
    pipe.expire(key, window)

    _, count, _, _ = await pipe.execute()

    if count >= limit:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": str(window), "X-RateLimit-Limit": str(limit)}
        )

    return {"remaining": limit - count - 1}

# FastAPI middleware
from fastapi import FastAPI
from fastapi.middleware.base import BaseHTTPMiddleware

class RateLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        api_key = request.headers.get("X-API-Key", request.client.host)
        await check_rate_limit(api_key)
        return await call_next(request)

app = FastAPI()
app.add_middleware(RateLimitMiddleware)

Scenario 6 — Debugging a Circular Import Error

Context: Your Python application raises ImportError: cannot import name 'UserService' from partially initialized module 'app.services' at startup. How do you diagnose and fix it?

# The circular import:
# app/models.py imports from app/services.py
# app/services.py imports from app/models.py

# models.py
from app.services import UserService   # Imports services
class User:
    ...

# services.py
from app.models import User            # Imports models → circular!
class UserService:
    def create(self, name): return User(name=name)
# Fix 1 — Restructure: move shared types to a separate module
# app/types.py (no imports from models or services)
from dataclasses import dataclass

@dataclass
class UserDTO:
    name: str
    email: str

# models.py — imports from types only
from app.types import UserDTO

# services.py — imports from types only
from app.types import UserDTO

# Fix 2 — Lazy import (defer import to function level)
# services.py
class UserService:
    def create(self, name: str):
        from app.models import User   # Import at function call time, not module load
        return User(name=name)

# Fix 3 — Use TYPE_CHECKING guard (for type hints only)
from __future__ import annotations
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from app.models import User   # Only used for type checking, not at runtime

Scenario 7 — Optimising a Slow Data Processing Pipeline

Context: A batch job that processes 10 million CSV rows runs for 4 hours. Each row is parsed, validated, transformed, and inserted into a database. How would you reduce the run time to under 30 minutes?

# Current slow approach
import csv
import psycopg2

conn = psycopg2.connect(...)
cur = conn.cursor()

with open("data.csv") as f:
    reader = csv.DictReader(f)
    for row in reader:                    # 10M iterations
        validated = validate(row)
        transformed = transform(validated)
        cur.execute(                       # 10M individual INSERT statements!
            "INSERT INTO records VALUES (%s, %s, %s)",
            (transformed["id"], transformed["name"], transformed["value"])
        )
    conn.commit()
# Optimised approach — chunked bulk insert + multiprocessing

import csv
import psycopg2
from psycopg2.extras import execute_values
from concurrent.futures import ProcessPoolExecutor
import itertools

def process_chunk(rows: list[dict]) -> list[tuple]:
    """Validate and transform a chunk of rows — runs in subprocess."""
    result = []
    for row in rows:
        try:
            validated = validate(row)
            transformed = transform(validated)
            result.append((transformed["id"], transformed["name"], transformed["value"]))
        except ValueError:
            pass  # Skip invalid rows
    return result

def chunked(iterable, size):
    it = iter(iterable)
    while chunk := list(itertools.islice(it, size)):
        yield chunk

CHUNK_SIZE = 10_000

with open("data.csv") as f:
    reader = csv.DictReader(f)
    with ProcessPoolExecutor(max_workers=8) as pool:
        conn = psycopg2.connect(...)
        cur = conn.cursor()

        for processed_chunk in pool.map(process_chunk, chunked(reader, CHUNK_SIZE)):
            execute_values(
                cur,
                "INSERT INTO records (id, name, value) VALUES %s ON CONFLICT DO NOTHING",
                processed_chunk,
                page_size=1000
            )
        conn.commit()

# Results:
# Before: 4 hours (10M individual inserts, single thread)
# After:  ~18 minutes (bulk insert + 8 parallel processes)

Scenario 8 — Python Service Crashing with OOM in Kubernetes

Context: Your Python service deployed in Kubernetes (256 MB memory limit) is being OOM-killed every few hours. The service reads and processes JSON payloads from a message queue.

Diagnosis approach:

# Step 1 — Add memory tracking
import tracemalloc
import resource
import logging

def log_memory():
    usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
    logging.info(f"Memory: {usage / 1024:.1f} MB")

# Step 2 — Profile what's growing
tracemalloc.start()
snapshot1 = tracemalloc.take_snapshot()

# ... process messages ...

snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
for stat in top_stats[:5]:
    print(stat)
# Common cause: loading entire JSON payload into memory
import json

# SLOW — loads entire file into memory
def process_large_json(path: str):
    with open(path) as f:
        data = json.load(f)   # Entire 500MB JSON in RAM!
    for record in data["records"]:
        process(record)

# FIX — use ijson for streaming JSON parsing
import ijson

def process_large_json(path: str):
    with open(path, "rb") as f:
        for record in ijson.items(f, "records.item"):
            process(record)    # One record at a time — minimal memory

# FIX 2 — for message queues, process and acknowledge immediately
async def consume_messages(queue):
    async for message in queue:
        await process_single_message(message.body)
        await message.ack()   # Don't batch large amounts in memory
        del message           # Help GC reclaim memory immediately

Scenario 9 — Writing Thread-Safe Python Code

Context: A Python service uses a shared in-memory cache (dict) that is accessed and updated by multiple threads. You’re seeing occasional KeyError and corrupted data. How do you fix it?

# BROKEN — dict is not thread-safe for concurrent reads/writes
import threading

cache = {}   # Shared across threads

def get_or_compute(key: str):
    if key not in cache:          # Thread A reads: not in cache
        value = expensive_compute(key)   # Both threads compute!
        cache[key] = value        # Race condition: both write
    return cache[key]

# Two threads can both see 'key not in cache' and both compute + write
# FIX — threading.Lock for mutual exclusion
import threading

cache = {}
lock = threading.Lock()

def get_or_compute(key: str):
    with lock:
        if key in cache:
            return cache[key]
        value = expensive_compute(key)  # Only one thread computes
        cache[key] = value
        return value

# FIX 2 — Use threading.RLock for reentrant code (same thread can acquire multiple times)
rlock = threading.RLock()

# FIX 3 — Use concurrent.futures or thread-safe collections
from queue import Queue

task_queue = Queue()   # Thread-safe FIFO

# FIX 4 — cachetools for thread-safe LRU cache
from cachetools import TTLCache
from cachetools.keys import hashkey
import threading

cache = TTLCache(maxsize=1000, ttl=300)  # LRU + TTL
cache_lock = threading.Lock()

def get_or_compute(key: str):
    with cache_lock:
        if key not in cache:
            cache[key] = expensive_compute(key)
        return cache[key]

Scenario 10 — Implementing Retry Logic with Exponential Backoff

Context: Your Python service calls an external payment API that occasionally returns HTTP 429 (rate limited) or 503 (service unavailable). How do you implement robust retry logic?

import time
import random
import functools
import logging
from typing import Type

logger = logging.getLogger(__name__)

def retry_with_backoff(
    retryable_exceptions: tuple[Type[Exception], ...],
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True,
):
    """Decorator for exponential backoff with jitter."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            delay = base_delay
            for attempt in range(1, max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except retryable_exceptions as e:
                    if attempt == max_retries:
                        logger.error(f"{func.__name__} failed after {max_retries} attempts: {e}")
                        raise

                    actual_delay = min(delay, max_delay)
                    if jitter:
                        # Add ±25% jitter to prevent thundering herd
                        actual_delay *= (0.75 + random.random() * 0.5)

                    logger.warning(
                        f"{func.__name__} attempt {attempt}/{max_retries} failed: {e}. "
                        f"Retrying in {actual_delay:.1f}s"
                    )
                    time.sleep(actual_delay)
                    delay *= 2  # Exponential backoff
        return wrapper
    return decorator

# Usage
import requests

class PaymentAPIError(Exception): pass
class RateLimitError(PaymentAPIError): pass

@retry_with_backoff(
    retryable_exceptions=(RateLimitError, requests.Timeout, requests.ConnectionError),
    max_retries=5,
    base_delay=1.0,
)
def charge_payment(amount: float, card_token: str) -> dict:
    response = requests.post(
        "https://api.payment.com/charge",
        json={"amount": amount, "token": card_token},
        timeout=10,
    )
    if response.status_code == 429:
        raise RateLimitError("Payment API rate limited")
    if response.status_code >= 500:
        response.raise_for_status()
    return response.json()
# Async version with tenacity library (production-grade)
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    wait_jitter,
    retry_if_exception_type,
)

@retry(
    retry=retry_if_exception_type((RateLimitError, aiohttp.ClientError)),
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=60) + wait_jitter(max=2),
)
async def charge_payment_async(amount: float, card_token: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.payment.com/charge",
            json={"amount": amount, "token": card_token},
        ) as resp:
            if resp.status == 429:
                raise RateLimitError("Rate limited")
            resp.raise_for_status()
            return await resp.json()
Services Used
PythonasyncioFastAPIDjangoRedis
Prerequisites
  • Python 3.10+
  • Basic understanding of async programming
  • OOP concepts
What You Learned
  • Concurrency patterns
  • Performance optimization
  • Production debugging
  • API design best practices

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios