Python Context Managers: The with Statement Beyond File Handling
You already know with open("file.txt") as f:. That's a context manager.
What most tutorials skip: you can write your own, and the pattern applies to far more than files — timers, database connections, temporary directories, logging, locks, and any resource that needs setup and guaranteed cleanup.
🎁 Free: AI Publishing Checklist — 7 steps in Python · Full pipeline: germy5.gumroad.com/l/xhxkzz (pay what you want, min $9.99)
What with actually does
# This:
with open("data.json") as f:
content = f.read()
# Is equivalent to:
f = open("data.json")
try:
content = f.read()
finally:
f.close() # always runs, even if an exception occurs
The with block guarantees cleanup runs — whether the block exits normally, raises an exception, or even if the process gets interrupted. That's the entire point.
Built-in context managers you already have
import threading
import tempfile
import os
# Files (most common)
with open("output.txt", "w") as f:
f.write("hello")
# Temporary directories — deleted automatically on exit
with tempfile.TemporaryDirectory() as tmpdir:
script = os.path.join(tmpdir, "task.py")
with open(script, "w") as f:
f.write("print('hi')")
# tmpdir and all its contents are deleted here
# Temporary files
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as tmp:
tmp.write("print('hello')")
path = tmp.name
# Thread locks
lock = threading.Lock()
with lock:
# only one thread runs this at a time
shared_counter += 1
# lock released automatically
# Suppress specific exceptions
from contextlib import suppress
with suppress(FileNotFoundError):
os.remove("might_not_exist.txt")
# continues normally if file doesn't exist
Writing your own: contextlib.contextmanager
The easiest way to create a context manager is with the @contextmanager decorator:
from contextlib import contextmanager
import time
@contextmanager
def timer(label: str):
"""Measure and print execution time of a block."""
start = time.perf_counter()
try:
yield # code inside `with` block runs here
finally:
elapsed = time.perf_counter() - start
print(f"[{label}] {elapsed:.3f}s")
# Usage
with timer("LLM call"):
# ... call the API ...
time.sleep(0.1) # simulate work
# prints: [LLM call] 0.100s
The structure is always: setup → yield → cleanup. The yield can optionally return a value (the as variable):
@contextmanager
def temp_script(code: str):
"""Write code to a temp file, yield the path, delete on exit."""
import tempfile, os
tmp = tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False, encoding="utf-8"
)
try:
tmp.write(code)
tmp.close()
yield tmp.name # <-- this becomes the `as` variable
finally:
os.unlink(tmp.name) # always deleted
# Usage
with temp_script("print('hello world')") as path:
import subprocess
result = subprocess.run(["python3", path], capture_output=True, text=True)
print(result.stdout) # hello world
# file deleted here automatically
Class-based context managers
For more complex setup/teardown, implement __enter__ and __exit__:
class ManagedDB:
"""Database connection that always closes, even on error."""
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = None
def __enter__(self):
import sqlite3
self.conn = sqlite3.connect(self.db_path)
return self.conn # becomes the `as` variable
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
if exc_type is None:
self.conn.commit() # commit on success
else:
self.conn.rollback() # rollback on error
self.conn.close()
return False # don't suppress exceptions
# Usage
with ManagedDB("pipeline.db") as db:
db.execute("INSERT INTO tasks VALUES (?, ?)", ("t01", "done"))
# committed and closed automatically
__exit__ receives the exception info if one occurred. Return True to suppress it, False (or None) to let it propagate.
Pattern: logging context
from contextlib import contextmanager
import logging
log = logging.getLogger(__name__)
@contextmanager
def task_context(task_id: str, task_name: str):
"""Log task start/end, catch and log errors."""
log.info(f"[{task_id}] Starting: {task_name}")
try:
yield
log.info(f"[{task_id}] ✅ Complete: {task_name}")
except Exception as e:
log.error(f"[{task_id}] ❌ Failed: {task_name} — {e}")
raise # re-raise so the caller can handle it
# Usage
with task_context("ch03", "Generate chapter"):
content = generate_chapter(outline["ch03"])
save_to_file(content, "chapters/ch03.md")
Pattern: atomic file writes
from contextlib import contextmanager
import os
import tempfile
@contextmanager
def atomic_write(path: str, mode: str = "w", **kwargs):
"""
Write to a temp file; rename to target on success.
If an error occurs, the original file is untouched.
"""
dir_name = os.path.dirname(os.path.abspath(path))
tmp = tempfile.NamedTemporaryFile(
mode=mode, dir=dir_name, delete=False, **kwargs
)
try:
yield tmp
tmp.close()
os.replace(tmp.name, path) # atomic on POSIX
except Exception:
tmp.close()
os.unlink(tmp.name) # discard incomplete file
raise
# Usage
with atomic_write("state.json", encoding="utf-8") as f:
import json
json.dump({"task-01": "done"}, f, indent=2)
# state.json is only updated if no exception occurred
Pattern: retrying context
from contextlib import contextmanager
import time
@contextmanager
def retry_on_error(max_attempts: int = 3, delay: float = 1.0, exceptions=(Exception,)):
"""Retry the block up to max_attempts times on specified exceptions."""
for attempt in range(1, max_attempts + 1):
try:
yield attempt
break # success — exit retry loop
except exceptions as e:
if attempt == max_attempts:
raise
print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
delay *= 2 # exponential backoff
# Usage
with retry_on_error(max_attempts=3, exceptions=(TimeoutError, ConnectionError)) as attempt:
print(f"Attempt {attempt}")
result = call_api() # retried on TimeoutError or ConnectionError
Multiple context managers in one with
# Old style (nested)
with open("input.txt") as src:
with open("output.txt", "w") as dst:
dst.write(src.read().upper())
# Modern style (single with, comma-separated)
with open("input.txt") as src, open("output.txt", "w") as dst:
dst.write(src.read().upper())
# With your custom managers
with timer("full pipeline"), task_context("run", "Pipeline"):
run_pipeline()
contextlib.ExitStack — dynamic context managers
When you don't know at compile time how many context managers you need:
from contextlib import ExitStack
chapters = ["ch01.md", "ch02.md", "ch03.md"]
with ExitStack() as stack:
# Open all files, register them all for cleanup
file_handles = [
stack.enter_context(open(f"chapters/{ch}"))
for ch in chapters
]
# all files open here
for fh in file_handles:
print(fh.read()[:50])
# all files closed here, even if some failed
Real pipeline usage
This is how the publishing pipeline uses context managers in practice:
from contextlib import contextmanager
import tempfile, os, subprocess, time, logging
log = logging.getLogger(__name__)
@contextmanager
def isolated_execution(code: str, timeout: int = 30):
"""
Run code in a temp directory. Yields (stdout, stderr, returncode).
Cleans up temp dir regardless of outcome.
"""
with tempfile.TemporaryDirectory() as tmpdir:
script = os.path.join(tmpdir, "task.py")
with open(script, "w", encoding="utf-8") as f:
f.write(code)
result = subprocess.run(
["python3", script],
capture_output=True, text=True,
timeout=timeout, cwd=tmpdir,
)
yield result.stdout, result.stderr, result.returncode
# tmpdir deleted here
# Clean calling code:
code = "for i in range(5): print(i)"
with isolated_execution(code) as (stdout, stderr, rc):
if rc == 0:
log.info(f"Output: {stdout.strip()}")
else:
log.error(f"Failed: {stderr.strip()}")
The pipeline wraps every subprocess execution and file write in context managers — zero leaked resources across 10-chapter ebook generation: germy5.gumroad.com/l/xhxkzz — pay what you want, min $9.99.
Further Reading
United States
NORTH AMERICA
Related News
How Braze’s CTO is rethinking engineering for the agentic area
10h ago
Amazon Employees Are 'Tokenmaxxing' Due To Pressure To Use AI Tools
21h ago

Implementing Multicloud Data Sharding with Hexagonal Storage Adapters
15h ago

DeepMind’s CEO Says AGI May Be ~4 Years Away. The Last Three Missing Pieces Are Not What Most People Think.
15h ago

CCSnapshot - A Claude Code Configs Transfer Tool
21h ago