Introduction to Race Conditions
Race conditions occur when the behavior of software depends on the timing or sequence of uncontrollable events. In web applications, these arise when multiple requests access shared resources without proper synchronization, leading to unexpected and exploitable behavior.
Time-of-Check to Time-of-Use (TOCTOU)
TOCTOU vulnerabilities occur when there's a gap between checking a condition and using the result:
# VULNERABLE: Gap between check and use
def withdraw(user_id, amount):
balance = get_balance(user_id)
if balance >= amount:
# Window of vulnerability here!
new_balance = balance - amount
set_balance(user_id, new_balance)
return {"success": True}
return {"success": False}Attack scenario: Two requests check the $100 balance simultaneously, both see sufficient funds, and both withdraw $80—draining $160 from a $100 account.
Secure Implementation
# Solution 1: Database-level locking
def withdraw_secure(user_id, amount):
with db.transaction():
balance = db.execute(
"SELECT balance FROM accounts WHERE user_id = %s FOR UPDATE",
[user_id]
).fetchone()['balance']
if balance < amount:
raise InsufficientFunds()
db.execute(
"UPDATE accounts SET balance = balance - %s WHERE user_id = %s",
[amount, user_id]
)
# Solution 2: Atomic conditional update
def withdraw_atomic(user_id, amount):
result = db.execute("""
UPDATE accounts
SET balance = balance - %s
WHERE user_id = %s AND balance >= %s
RETURNING balance
""", [amount, user_id, amount])
if result.rowcount == 0:
raise InsufficientFunds()Business Logic Race Conditions
Double Spending in E-Commerce
# VULNERABLE: Coupon can be used multiple times
def apply_coupon(cart_id, coupon_code):
coupon = get_coupon(coupon_code)
if coupon.used:
raise CouponAlreadyUsed()
# Race window: multiple requests pass the check
cart.discount = coupon.discount_amount
coupon.used = True
save_coupon(coupon)
# SECURE: Atomic coupon redemption
def apply_coupon_secure(cart_id, coupon_code):
with db.transaction():
result = db.execute("""
UPDATE coupons
SET used = TRUE, used_at = NOW(), used_by_cart = %s
WHERE code = %s AND used = FALSE AND expires_at > NOW()
RETURNING discount_amount
""", [cart_id, coupon_code])
if result.rowcount == 0:
raise InvalidCoupon("Coupon invalid or already used")
discount = result.fetchone()['discount_amount']
db.execute("UPDATE carts SET discount = %s WHERE id = %s", [discount, cart_id])Inventory Overselling
# SECURE: Atomic inventory decrement
def purchase_item_secure(user_id, item_id, quantity):
with db.transaction():
result = db.execute("""
UPDATE inventory
SET stock = stock - %s
WHERE item_id = %s AND stock >= %s
RETURNING stock
""", [quantity, item_id, quantity])
if result.rowcount == 0:
raise OutOfStock()
db.execute("""
INSERT INTO orders (user_id, item_id, quantity, status)
VALUES (%s, %s, %s, 'confirmed')
""", [user_id, item_id, quantity])Distributed Locking
For distributed systems without shared database connections:
import redis
from contextlib import contextmanager
class DistributedLock:
def __init__(self, redis_client):
self.redis = redis_client
@contextmanager
def lock(self, resource_id, ttl_seconds=30):
lock_key = f"lock:{resource_id}"
lock_value = secrets.token_urlsafe(16)
acquired = self.redis.set(lock_key, lock_value, nx=True, ex=ttl_seconds)
if not acquired:
raise LockNotAcquired(f"Resource {resource_id} is locked")
try:
yield
finally:
self.redis.eval("""
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
end
""", 1, lock_key, lock_value)
# Usage
def process_order(order_id):
with lock.lock(f"order:{order_id}"):
order = get_order(order_id)
if order.status != 'pending':
return
process_payment(order)
order.status = 'completed'Idempotency for API Safety
class IdempotencyHandler:
def __init__(self, redis_client):
self.redis = redis_client
def process_request(self, idempotency_key, handler_func):
cache_key = f"idempotency:{idempotency_key}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
with self.redis.lock(f"idempotency_lock:{idempotency_key}", timeout=30):
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
result = handler_func()
self.redis.setex(cache_key, 86400, json.dumps(result))
return resultTesting for Race Conditions
import asyncio
import aiohttp
async def race_condition_test(url, data, num_requests=50):
"""Send parallel requests to test for race conditions."""
async with aiohttp.ClientSession() as session:
tasks = [session.post(url, json=data) for _ in range(num_requests)]
responses = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(
1 for r in responses
if not isinstance(r, Exception) and r.status == 200
)
if success_count > 1:
print("VULNERABLE: Race condition detected!")
return responses
# Test coupon redemption
asyncio.run(race_condition_test(
"https://shop.example.com/api/redeem-coupon",
{"coupon_code": "DISCOUNT50", "cart_id": "12345"}
))Defense Patterns Summary
| Pattern | Use Case | Trade-offs |
|---|---|---|
Database FOR UPDATE |
Single database, row-level locks | Can cause lock contention |
| Atomic Updates | Counter/balance operations | Limited to simple operations |
| Distributed Locks | Microservices, multi-DB | Adds complexity |
| Unique Constraints | One-time operations | Database-specific syntax |
| Idempotency Keys | API operations | Requires client cooperation |
Automated Race Condition Detection
Race conditions are difficult to find through manual testing because they depend on precise timing. Traditional security scanners often miss these vulnerabilities since they test endpoints sequentially.
RedVeil's AI-powered penetration testing platform identifies race condition vulnerabilities by analyzing how your application handles concurrent requests, testing for double-spending scenarios, and validating that proper synchronization mechanisms are in place.
Start testing your application for race conditions with RedVeil →