By Xavier Collantes
9/3/2025
In this article, I may say 'Python and parallel execution' but to be clear, Python as a language can't actually run multiple threads in parallel. It can only run multiple threads in pseudo-parallel where it switches back and forth between threads.
At Faxion AI, async architecture enabled us to process hundreds of AI image requests concurrently while maintaining adequate API response times. This was crucial for our user experience and cost optimization.
python3
, you are running a single process. And some libraries enable you
to run multiple threads within a single process.A 'mutex' means 'mutual exclusion'. It is a synchronization method that allows only one thread to access a resource at a time. If a process accesses a resource, new processes must wait until the resource is released.
1import time
2import requests
3
4
5def fetch_url(url: str, name: str):
6 """I/O-bound task - GIL is released during network calls"""
7 start = time.time()
8
9 # GIL is released during this network call
10 response = requests.get(url)
11
12 end = time.time()
13 print(f"{name}: {response.status_code} in {end - start:.2f}s")
14
15
16urls = [
17 "https://httpbin.org/delay/3",
18 "https://httpbin.org/delay/3",
19 "https://httpbin.org/delay/3",
20 "https://httpbin.org/delay/3",
21]
22
23# These WILL run concurrently because GIL is released during I/O
24threads = []
25
26full_start = time.time()
27
28for i, url in enumerate(urls):
29 fetch_url(url, f"Thread-{i}")
30
31full_end = time.time()
32print(f"Full time: {full_end - full_start:.2f}s")
33
1Thread-0: 200 in 3.46s
2Thread-1: 200 in 3.43s
3Thread-2: 200 in 4.78s
4Thread-3: 200 in 4.50s
5Full time: 16.18s
6
All threads run sequentially, not in parallel, making this slower than single-threaded execution due to context switching overhead.
1import threading
2import requests
3import time
4
5
6def fetch_url(url: str, name: str):
7 """I/O-bound task - GIL is released during network calls"""
8 start = time.time()
9
10 # GIL is released during this network call
11 response = requests.get(url)
12
13 end = time.time()
14 print(f"{name}: {response.status_code} in {end - start:.2f}s")
15
16# Dummy URLs to simulate I/O-bound tasks.
17# Each URL will delay for 3 seconds.
18urls = [
19 "https://httpbin.org/delay/3",
20 "https://httpbin.org/delay/3",
21 "https://httpbin.org/delay/3",
22 "https://httpbin.org/delay/3",
23]
24
25# These WILL run concurrently because GIL is released during I/O
26threads: list[threading.Thread] = []
27
28full_start = time.time()
29
30for i, url in enumerate(urls):
31 thread = threading.Thread(target=fetch_url, args=[url, f"Thread-{i}"])
32 threads.append(thread)
33 thread.start()
34
35for thread in threads:
36 thread.join()
37
38full_end = time.time()
39print(f"Full time: {full_end - full_start:.2f}s")
40
1Thread-2: 200 in 3.52s
2Thread-0: 200 in 3.81s
3Thread-1: 200 in 4.00s
4Thread-3: 200 in 4.17s
5Full time: 4.17s
6
1import multiprocessing
2import time
3
4def cpu_intensive_task(name: str):
5 """CPU-bound task that can run in parallel with multiprocessing"""
6 start = time.time()
7
8 # This WILL run in parallel across processes
9 total = 0
10 for i in range(10_000_000):
11 total += i * i
12
13 end = time.time()
14 print(f"{name}: {end - start:.2f} seconds")
15 return total
16
17if __name__ == "__main__":
18 # This WILL run in parallel on multiple CPU cores
19 with multiprocessing.Pool(processes=4) as pool:
20 tasks = [f"Process-{i}" for i in range(4)]
21 results = pool.map(cpu_intensive_task, tasks)
22
1from fastapi import FastAPI
2from httpx import AsyncClient
3import asyncio
4
5app = FastAPI()
6
7@app.post("/process-image")
8async def process_image(image_url: str):
9 # Start the AI processing (this takes 30+ seconds)
10 task_id = await start_ai_processing(image_url)
11
12 # Return immediately - don't make user wait
13 return {"task_id": task_id, "status": "processing"}
14
15@app.get("/status/{task_id}")
16async def check_status(task_id: str):
17 # Quick status check - returns in milliseconds
18 status = await get_processing_status(task_id)
19 return {"task_id": task_id, "status": status}
20
21async def start_ai_processing(image_url: str) -> str:
22 """Start background AI processing without blocking the API"""
23 task_id = generate_unique_id()
24
25 # This runs in background - API response already sent
26 asyncio.create_task(process_image_with_ai(task_id, image_url))
27
28 return task_id
29
This is exactly how we built Faxion AI's image generation pipeline. Users get instant feedback while AI models work in the background.
1import asyncio
2from httpx import AsyncClient
3from typing import Optional
4
5class AIImageService:
6 def __init__(self):
7 self.client = AsyncClient()
8 self.processing_jobs = {}
9
10 async def submit_image_generation(self, prompt: str) -> str:
11 """Submit job and return immediately"""
12 # Call external AI service
13 response = await self.client.post(
14 "https://api.replicate.com/v1/predictions",
15 json={
16 "version": "stable-diffusion-xl",
17 "input": {"prompt": prompt}
18 }
19 )
20
21 job_id = response.json()["id"]
22
23 # Start background polling - don't wait for result
24 asyncio.create_task(self._poll_for_completion(job_id))
25
26 return job_id
27
28 async def _poll_for_completion(self, job_id: str):
29 """Background task that polls AI service until complete"""
30 while True:
31 status_response = await self.client.get(f"https://api.replicate.com/v1/predictions/{job_id}")
32 data = status_response.json()
33
34 if data["status"] == "succeeded":
35 # Save result to database/cache
36 await self._save_result(job_id, data["output"])
37 break
38 elif data["status"] == "failed":
39 await self._handle_failure(job_id, data["error"])
40 break
41
42 # Wait before next poll - don't spam the API
43 await asyncio.sleep(5)
44
Check out our FastAPI article for more patterns on building production APIs with async capabilities.
1import asyncpg
2import asyncio
3from contextlib import asynccontextmanager
4
5class AsyncUserService:
6 def __init__(self, database_url: str):
7 self.database_url = database_url
8 self.pool = None
9
10 async def initialize(self):
11 """Create connection pool on startup"""
12 self.pool = await asyncpg.create_pool(self.database_url)
13
14 async def get_user_dashboard_data(self, user_id: str) -> dict:
15 """Fetch multiple data sources concurrently"""
16 async with self.pool.acquire() as conn:
17 # Run all queries concurrently instead of sequentially
18 user_data, order_history, preferences = await asyncio.gather(
19 conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id),
20 conn.fetch("SELECT * FROM orders WHERE user_id = $1 LIMIT 10", user_id),
21 conn.fetchrow("SELECT * FROM user_preferences WHERE user_id = $1", user_id)
22 )
23
24 return {
25 "user": dict(user_data),
26 "recent_orders": [dict(order) for order in order_history],
27 "preferences": dict(preferences) if preferences else {}
28 }
29
30# FastAPI integration
31@app.get("/dashboard/{user_id}")
32async def user_dashboard(user_id: str):
33 service = AsyncUserService(DATABASE_URL)
34
35 # This could take 300ms instead of 900ms with sync queries
36 data = await service.get_user_dashboard_data(user_id)
37 return data
38
1import asyncio
2from typing import Callable, Any
3import logging
4
5class BackgroundTaskProcessor:
6 def __init__(self):
7 self.task_queue = asyncio.Queue()
8 self.running = False
9
10 async def start_worker(self):
11 """Start background worker that processes tasks"""
12 self.running = True
13
14 while self.running:
15 try:
16 # Get next task from queue
17 task_func, args, kwargs = await self.task_queue.get()
18
19 # Execute task
20 await task_func(*args, **kwargs)
21
22 # Mark task as done
23 self.task_queue.task_done()
24
25 except Exception as e:
26 logging.error(f"Background task failed: {e}")
27
28 async def add_task(self, func: Callable, *args, **kwargs):
29 """Add task to background processing queue"""
30 await self.task_queue.put((func, args, kwargs))
31
32 async def shutdown(self):
33 """Graceful shutdown - wait for tasks to complete"""
34 self.running = False
35 await self.task_queue.join()
36
37# Usage in FastAPI
38task_processor = BackgroundTaskProcessor()
39
40@app.on_event("startup")
41async def startup_event():
42 asyncio.create_task(task_processor.start_worker())
43
44@app.post("/send-newsletter")
45async def send_newsletter(email_list: list[str]):
46 # Add to background processing - return immediately
47 await task_processor.add_task(send_email_batch, email_list)
48
49 return {"status": "Newsletter queued for sending"}
50
51async def send_email_batch(email_list: list[str]):
52 """This runs in background without blocking API"""
53 for email in email_list:
54 await send_single_email(email)
55 # Small delay to avoid rate limits
56 await asyncio.sleep(0.1)
57
1# ❌ Wrong - blocks the entire event loop
2async def bad_file_processing():
3 with open("huge_file.txt") as f: # Synchronous I/O - blocks everything
4 content = f.read()
5 return process_content(content)
6
7# ✅ Right - use async file operations
8import aiofiles
9
10async def good_file_processing():
11 async with aiofiles.open("huge_file.txt") as f:
12 content = await f.read()
13 return await process_content_async(content)
14
1# ❌ Wrong - creates task but doesn't wait for completion
2async def incomplete_processing():
3 asyncio.create_task(important_background_work())
4 return "Done" # But background work might still be running!
5
6# ✅ Right - either await or properly manage task lifecycle
7async def complete_processing():
8 task = asyncio.create_task(important_background_work())
9 # Store task reference so it doesn't get garbage collected
10 background_tasks.add(task)
11 task.add_done_callback(background_tasks.discard)
12 return "Processing started"
13
Related by topics: