xavier collantes

Python Async Architecture: Real-World Experience

By Xavier Collantes

9/3/2025


Python Async Architecture
Multitasking is a myth. The term "multitask" was first coined in a 1965 paper about the IBM 360 computer. Since then, we've applied this term to describe how people work, but ironically, just like computers, people cannot truly multitask. Instead, both humans and computer programs accomplish apparent multitasking by switching between tasks very quickly.
This principle applies directly to Python. Despite its reputation for being slow, Python powers production systems that handle millions of banking transactions, social media posts, and billions of internet actions daily. The secret lies in advanced software engineering patterns that we'll explore in this article.
Dazed and Confused

In this article, I may say 'Python and parallel execution' but to be clear, Python as a language can't actually run multiple threads in parallel. It can only run multiple threads in pseudo-parallel where it switches back and forth between threads.

Async Python Changes Everything

Think of async programming like a smart restaurant waiter. A regular waiter takes your order, stands by the kitchen waiting for your food, then brings it to you before helping the next table. That's synchronous programming or one thing at a time.
But an async waiter is different. They take your order, give it to the kitchen, then immediately help other customers while your food cooks. When your meal is ready, they bring it over. The waiter helps many people at once without anyone waiting longer than they need to. That's exactly how async Python works. It starts tasks and moves on to help others while waiting for results.
Traditional synchronous Python: One request blocks everything until complete.
Async Python: Thousands of operations run concurrently, with automatic task switching when waiting for data.
The result? Applications that feel lightning-fast to users while using minimal server resources.
Related Article

At Faxion AI, async architecture enabled us to process hundreds of AI image requests concurrently while maintaining adequate API response times. This was crucial for our user experience and cost optimization.

How Processes and Threads Work in Python

Understanding Python's concurrency model is crucial for grasping why async programming is so useful. Python offers three main approaches to concurrent execution: processes, threads, and asynchronous programming.

Processes And Threads

Diagram
Box is a process, line is a thread.
In computing, a single process can contain multiple threads. A thread is a sequence of instructions that can be executed independently of other threads. A process is a collection of resources and threads.
In Python, a process is a single instance of a Python interpreter. So when you run python3, you are running a single process. And some libraries enable you to run multiple threads within a single process.
HOWEVER! Python can't actually run multiple threads in parallel. Python uses what's called pseudo-multithreading, where the interpreter switches between threads in a multitasking-type fashion.

The Global Interpreter Lock (GIL) Problem

Python's concurrency feature is the Global Interpreter Lock (GIL), a mutex that allows only one thread to execute Python bytecode at a time, even on machines with multiple cores.

A 'mutex' means 'mutual exclusion'. It is a synchronization method that allows only one thread to access a resource at a time. If a process accesses a resource, new processes must wait until the resource is released.

GIL Diagram
The GIL exists because Python's memory management isn't thread-safe. It prevents race conditions but creates a significant limitation: true parallel execution of CPU-bound tasks is impossible with threads.
🐍
Python3
1import time
2import requests
3
4
5def fetch_url(url: str, name: str):
6    """I/O-bound task - GIL is released during network calls"""
7    start = time.time()
8
9    # GIL is released during this network call
10    response = requests.get(url)
11
12    end = time.time()
13    print(f"{name}: {response.status_code} in {end - start:.2f}s")
14
15
16urls = [
17    "https://httpbin.org/delay/3",
18    "https://httpbin.org/delay/3",
19    "https://httpbin.org/delay/3",
20    "https://httpbin.org/delay/3",
21]
22
23# These WILL run concurrently because GIL is released during I/O
24threads = []
25
26full_start = time.time()
27
28for i, url in enumerate(urls):
29    fetch_url(url, f"Thread-{i}")
30
31full_end = time.time()
32print(f"Full time: {full_end - full_start:.2f}s")
33
snippet hosted withby Xavier
The result is that all threads run sequentially, not in parallel and full end to end time is 16.18s.
txt
1Thread-0: 200 in 3.46s
2Thread-1: 200 in 3.43s
3Thread-2: 200 in 4.78s
4Thread-3: 200 in 4.50s
5Full time: 16.18s
6
snippet hosted withby Xavier
Cool visualization of GIL: blogspot.com

All threads run sequentially, not in parallel, making this slower than single-threaded execution due to context switching overhead.

When Threads Actually Work: I/O-Bound Operations

Threads become useful when waiting for I/O or Input/Output operations because the GIL is released during I/O waits:
🐍
Python3
1import threading
2import requests
3import time
4
5
6def fetch_url(url: str, name: str):
7    """I/O-bound task - GIL is released during network calls"""
8    start = time.time()
9
10    # GIL is released during this network call
11    response = requests.get(url)
12
13    end = time.time()
14    print(f"{name}: {response.status_code} in {end - start:.2f}s")
15
16# Dummy URLs to simulate I/O-bound tasks.
17# Each URL will delay for 3 seconds.
18urls = [
19    "https://httpbin.org/delay/3",
20    "https://httpbin.org/delay/3",
21    "https://httpbin.org/delay/3",
22    "https://httpbin.org/delay/3",
23]
24
25# These WILL run concurrently because GIL is released during I/O
26threads: list[threading.Thread] = []
27
28full_start = time.time()
29
30for i, url in enumerate(urls):
31    thread = threading.Thread(target=fetch_url, args=[url, f"Thread-{i}"])
32    threads.append(thread)
33    thread.start()
34
35for thread in threads:
36    thread.join()
37
38full_end = time.time()
39print(f"Full time: {full_end - full_start:.2f}s")
40
snippet hosted withby Xavier
Result: All requests execute concurrently and finished at different times and full end to end time is 4.17s as opposed to 16.18s in the previous example.
txt
1Thread-2: 200 in 3.52s
2Thread-0: 200 in 3.81s
3Thread-1: 200 in 4.00s
4Thread-3: 200 in 4.17s
5Full time: 4.17s
6
snippet hosted withby Xavier

True Parallelism with Multiprocessing

For CPU-bound tasks, multiprocessing bypasses the GIL by creating separate Python interpreter processes:
🐍
Python3
1import multiprocessing
2import time
3
4def cpu_intensive_task(name: str):
5    """CPU-bound task that can run in parallel with multiprocessing"""
6    start = time.time()
7
8    # This WILL run in parallel across processes
9    total = 0
10    for i in range(10_000_000):
11        total += i * i
12
13    end = time.time()
14    print(f"{name}: {end - start:.2f} seconds")
15    return total
16
17if __name__ == "__main__":
18    # This WILL run in parallel on multiple CPU cores
19    with multiprocessing.Pool(processes=4) as pool:
20        tasks = [f"Process-{i}" for i in range(4)]
21        results = pool.map(cpu_intensive_task, tasks)
22
snippet hosted withby Xavier
Benefits of multiprocessing:
  • True parallel execution on multiple CPU cores
  • Each process has its own memory space and GIL
  • Ideal for CPU-intensive computations
Drawbacks of multiprocessing:
  • High memory overhead (each process is a full Python interpreter)
  • Inter-process communication requires serialization
  • Slower startup time
  • More complex error handling

When Async Python Transforms Your Application

1. API Endpoints That Don't Block

FastAPI's async capabilities shine when you need to make external API calls without blocking other requests:
🐍
Python3
1from fastapi import FastAPI
2from httpx import AsyncClient
3import asyncio
4
5app = FastAPI()
6
7@app.post("/process-image")
8async def process_image(image_url: str):
9    # Start the AI processing (this takes 30+ seconds)
10    task_id = await start_ai_processing(image_url)
11
12    # Return immediately - don't make user wait
13    return {"task_id": task_id, "status": "processing"}
14
15@app.get("/status/{task_id}")
16async def check_status(task_id: str):
17    # Quick status check - returns in milliseconds
18    status = await get_processing_status(task_id)
19    return {"task_id": task_id, "status": status}
20
21async def start_ai_processing(image_url: str) -> str:
22    """Start background AI processing without blocking the API"""
23    task_id = generate_unique_id()
24
25    # This runs in background - API response already sent
26    asyncio.create_task(process_image_with_ai(task_id, image_url))
27
28    return task_id
29
snippet hosted withby Xavier
This pattern is everywhere in modern APIs:
  • Image/Video processing (Midjourney, DALL-E style workflows)
  • Data analysis jobs (Report generation, ML training)
  • Email/notification sending (Newsletter dispatching)

This is exactly how we built Faxion AI's image generation pipeline. Users get instant feedback while AI models work in the background.

2. The AI API Polling Pattern

Most AI services (OpenAI, Replicate, Stability AI) use this exact async pattern:
🐍
Python3
1import asyncio
2from httpx import AsyncClient
3from typing import Optional
4
5class AIImageService:
6    def __init__(self):
7        self.client = AsyncClient()
8        self.processing_jobs = {}
9
10    async def submit_image_generation(self, prompt: str) -> str:
11        """Submit job and return immediately"""
12        # Call external AI service
13        response = await self.client.post(
14            "https://api.replicate.com/v1/predictions",
15            json={
16                "version": "stable-diffusion-xl",
17                "input": {"prompt": prompt}
18            }
19        )
20
21        job_id = response.json()["id"]
22
23        # Start background polling - don't wait for result
24        asyncio.create_task(self._poll_for_completion(job_id))
25
26        return job_id
27
28    async def _poll_for_completion(self, job_id: str):
29        """Background task that polls AI service until complete"""
30        while True:
31            status_response = await self.client.get(f"https://api.replicate.com/v1/predictions/{job_id}")
32            data = status_response.json()
33
34            if data["status"] == "succeeded":
35                # Save result to database/cache
36                await self._save_result(job_id, data["output"])
37                break
38            elif data["status"] == "failed":
39                await self._handle_failure(job_id, data["error"])
40                break
41
42            # Wait before next poll - don't spam the API
43            await asyncio.sleep(5)
44
snippet hosted withby Xavier
Real-world benefits:
  • User experience: Instant API responses instead of 30+ second waits
  • Resource efficiency: One server handles hundreds of concurrent jobs
  • Error resilience: Failed jobs don't crash your entire API

Check out our FastAPI article for more patterns on building production APIs with async capabilities.

3. Database Operations That Scale

Async database operations prevent one slow query from blocking your entire application:
🐍
Python3
1import asyncpg
2import asyncio
3from contextlib import asynccontextmanager
4
5class AsyncUserService:
6    def __init__(self, database_url: str):
7        self.database_url = database_url
8        self.pool = None
9
10    async def initialize(self):
11        """Create connection pool on startup"""
12        self.pool = await asyncpg.create_pool(self.database_url)
13
14    async def get_user_dashboard_data(self, user_id: str) -> dict:
15        """Fetch multiple data sources concurrently"""
16        async with self.pool.acquire() as conn:
17            # Run all queries concurrently instead of sequentially
18            user_data, order_history, preferences = await asyncio.gather(
19                conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id),
20                conn.fetch("SELECT * FROM orders WHERE user_id = $1 LIMIT 10", user_id),
21                conn.fetchrow("SELECT * FROM user_preferences WHERE user_id = $1", user_id)
22            )
23
24        return {
25            "user": dict(user_data),
26            "recent_orders": [dict(order) for order in order_history],
27            "preferences": dict(preferences) if preferences else {}
28        }
29
30# FastAPI integration
31@app.get("/dashboard/{user_id}")
32async def user_dashboard(user_id: str):
33    service = AsyncUserService(DATABASE_URL)
34
35    # This could take 300ms instead of 900ms with sync queries
36    data = await service.get_user_dashboard_data(user_id)
37    return data
38
snippet hosted withby Xavier
Performance impact:
  • Sequential queries: 300ms + 300ms + 300ms = 900ms
  • Concurrent queries: max(300ms, 300ms, 300ms) = 300ms

Async Patterns That Actually Matter in Production

Background Task Processing

🐍
Python3
1import asyncio
2from typing import Callable, Any
3import logging
4
5class BackgroundTaskProcessor:
6    def __init__(self):
7        self.task_queue = asyncio.Queue()
8        self.running = False
9
10    async def start_worker(self):
11        """Start background worker that processes tasks"""
12        self.running = True
13
14        while self.running:
15            try:
16                # Get next task from queue
17                task_func, args, kwargs = await self.task_queue.get()
18
19                # Execute task
20                await task_func(*args, **kwargs)
21
22                # Mark task as done
23                self.task_queue.task_done()
24
25            except Exception as e:
26                logging.error(f"Background task failed: {e}")
27
28    async def add_task(self, func: Callable, *args, **kwargs):
29        """Add task to background processing queue"""
30        await self.task_queue.put((func, args, kwargs))
31
32    async def shutdown(self):
33        """Graceful shutdown - wait for tasks to complete"""
34        self.running = False
35        await self.task_queue.join()
36
37# Usage in FastAPI
38task_processor = BackgroundTaskProcessor()
39
40@app.on_event("startup")
41async def startup_event():
42    asyncio.create_task(task_processor.start_worker())
43
44@app.post("/send-newsletter")
45async def send_newsletter(email_list: list[str]):
46    # Add to background processing - return immediately
47    await task_processor.add_task(send_email_batch, email_list)
48
49    return {"status": "Newsletter queued for sending"}
50
51async def send_email_batch(email_list: list[str]):
52    """This runs in background without blocking API"""
53    for email in email_list:
54        await send_single_email(email)
55        # Small delay to avoid rate limits
56        await asyncio.sleep(0.1)
57
snippet hosted withby Xavier

Common Async Pitfalls (And How to Avoid Them)

Blocking Operations in Async Functions

🐍
Python3
1# ❌ Wrong - blocks the entire event loop
2async def bad_file_processing():
3    with open("huge_file.txt") as f:  # Synchronous I/O - blocks everything
4        content = f.read()
5    return process_content(content)
6
7# ✅ Right - use async file operations
8import aiofiles
9
10async def good_file_processing():
11    async with aiofiles.open("huge_file.txt") as f:
12        content = await f.read()
13    return await process_content_async(content)
14
snippet hosted withby Xavier

Not Properly Awaiting Tasks

🐍
Python3
1# ❌ Wrong - creates task but doesn't wait for completion
2async def incomplete_processing():
3    asyncio.create_task(important_background_work())
4    return "Done"  # But background work might still be running!
5
6# ✅ Right - either await or properly manage task lifecycle
7async def complete_processing():
8    task = asyncio.create_task(important_background_work())
9    # Store task reference so it doesn't get garbage collected
10    background_tasks.add(task)
11    task.add_done_callback(background_tasks.discard)
12    return "Processing started"
13
snippet hosted withby Xavier

Next Steps

  • Start small: Convert one synchronous endpoint to async
  • Add background processing: Implement task queues for heavy operations

Further Reading

Related Articles

Related by topics:

infrastructure
python
apis
architecture
FastAPI: Build your own APIs

Production experience with FastAPI with code examples.

By Xavier Collantes5/30/2025
python
infrastructure
apis
Network Debugging: Ports to Packet Traces

My scripts and tools for diagnosing connectivity issues.

By Xavier Collantes9/24/2025
networking
debugging
architecture
+9

HomeFeedback