Write-heavy
When to reach for this
Reach for this when…
- Write QPS is 10K+ per region and climbing
- Writes are append-only in shape (events, logs, metrics, timeseries, clicks)
- Reads are aggregate or historical rather than point-lookup on the latest write
- A single OLTP primary would blow up under the write load
- The interviewer mentions "ingestion rate", "fan-in", or "clickstream"
Not really this pattern when…
- Writes are low-volume but high-value with strict transactional guarantees (double-entry accounting)
- Every write must be strongly consistent with every read immediately (inventory reservations)
- Read QPS dwarfs write QPS by 10:1 or more (that is the read-heavy pattern)
- The write volume fits comfortably on a single well-tuned Postgres primary
Good vs bad answer
Interviewer probe
“You are receiving 200K ad click events per second. How do you store and aggregate them for real-time dashboards and end-of-day reconciliation?”
Weak answer
"Shard Postgres into 20 shards at 10K writes each. Run aggregate queries directly on the shards."
Strong answer
"Postgres is not the right shape for 200K append-only events per second. Click events go to Kafka first — append-only, partitioned by advertiser_id for locality, with 7-day retention. From there, three consumer groups:
- 1A Flink job does windowed aggregation (1-minute tumbling windows) and writes pre-computed counts to Druid for real-time dashboards. Dashboard queries hit Druid, not the raw stream.
- 2A second consumer writes raw events to S3 in Parquet format, partitioned by date and advertiser. This is the reconciliation dataset — immutable, auditable, queryable via Athena or Spark.
- 3A third consumer feeds a fraud-detection pipeline that looks for click patterns (same IP, rapid-fire clicks on one ad) in near-real-time.
Reads never touch Kafka. The dashboards read Druid (sub-second aggregates), reconciliation reads S3 (batch), and fraud reads a streaming processor. If I need a new query shape, I add a consumer and replay the Kafka topic — no migration, no schema change across 20 shards."
Why it wins: Names the log (Kafka), the stream processor (Flink), the real-time OLAP store (Druid), the cold archive (S3 Parquet), and the replay story. This is a purpose-built write-heavy pipeline, not a scaled-up OLTP design.
Cheat sheet
- •Write-heavy starts at ~10K writes/sec per region — below that, tune one primary.
- •Log first (Kafka/Kinesis). Everything downstream derives from the log.
- •LSM storage (Cassandra, ScyllaDB, RocksDB) for write-optimised persistence.
- •Materialised views are disposable — replay the log to rebuild or reshape.
- •Write amplification is the hidden multiplier: 1 logical write = N physical writes (WAL + indexes + replication).
- •Partition on high-cardinality key; monitor per-partition write rates and lag.
- •Sharded counters (N sub-keys) for any entity receiving disproportionate writes.
- •Batch at every tier: client linger, app micro-batch, DB group commit. They compound.
- •At-least-once + idempotent consumers > exactly-once semantics you cannot operate.
- •DLQ every consumer. Alert on its depth. Never silently drop poison messages.
- •Consumer lag is your primary SLO metric, not database latency.
- •Hierarchical aggregation reduces fan-in by 100–1000x before touching durable storage.
Core concept
Write-heavy designs invert the usual OLTP assumption that writes are the minority. In a write-heavy system, the write path is the critical path, and every architectural decision flows from a single question: how do I absorb N writes per second without losing data, without blocking reads, and without melting the storage tier?
The danger is the write amplification cascade. A single logical write — say, a user clicking "Like" — can become many physical writes: an update to the counter row, an index update, a WAL flush, a replication event to each follower, a CDC event to Kafka, a cache invalidation. If the raw event rate is 50K/sec and each logical write fans out to 6 physical writes, the storage engine sees 300K IOPS. Miss this multiplication and your capacity plan is off by an order of magnitude on day one.
Producers write fast to a log; consumers asynchronously materialise views. Reads don't block writes.
The pattern has four progressive moves, each unlocking roughly an order of magnitude more throughput:
Move 1 — Vertical tuning. Before adding machines, exhaust what one machine can do. NVMe SSDs, battery-backed write cache, OS page-size alignment, Postgres commit_delay and synchronous_commit = off (when durability can tolerate it), InnoDB innodb_flush_log_at_trx_commit = 2. A single well-tuned Postgres on modern hardware can sustain 10K–15K write transactions per second. This is the right answer for most startups; jumping to sharding before saturating one node is premature complexity.
Move 2 — Horizontal sharding. When vertical is exhausted, split the keyspace. Each shard is a full primary absorbing its slice of writes. The shard key must have high cardinality and even distribution — user_id is usually good, timestamp alone is terrible (all recent writes hit the same shard). The operational cost is real: cross-shard queries, resharding when growth exceeds the original shard count, and the dual-write migration dance that resharding requires.
Move 3 — Log-first architecture. Instead of writing directly to a queryable store, write to an append-only log (Kafka, Kinesis, Pulsar). Sequential appends are 10–100x faster than random updates. Consumers read the log asynchronously and materialise purpose-built views — an LSM store for point lookups by device + time, a Druid cluster for aggregate dashboards, an S3 archive for compliance. The log is the source of truth; views are disposable projections you can rebuild by replaying. This is eventual consistency by design, but the log itself is durable within milliseconds.
Move 4 — Shedding, batching, and hierarchical aggregation. At true planet scale (millions of writes per second), even Kafka partitions saturate if each event is handled individually. The answer is to reduce the event count before it hits the durable layer. Load shedding drops best-effort traffic (analytics pings) during overload. Client-side and application-level batching collapse many events into one write. Hierarchical aggregation builds a tree of partial aggregators — edge nodes count locally, mid-tier nodes merge regional counts, the root produces the global number. YouTube live viewer counts, for example, would melt any single counter; a three-tier aggregation tree reduces the write fan-in by 1000x before it touches durable storage.
The write-heavy pattern is the mirror image of read-heavy (see the read-heavy pattern for the complementary view). Where read-heavy pushes reads away from the database with caches and CDNs, write-heavy pushes writes into logs, batches, and trees to reduce the number that ever reach a strongly-consistent store. Many real systems are write-heavy on the ingestion path and read-heavy on the query path — the canonical CQRS split — and recognising that dual nature in the first two minutes is what separates a senior design from a junior one.
Canonical examples
- →Metrics / observability (Prometheus, Datadog, Grafana Mimir)
- →Clickstream / analytics ingestion (ad impressions, page views)
- →IoT device telemetry (Uber GPS pings: 1M+ events/sec)
- →Activity feeds with fan-out-on-write (Twitter home timeline)
- →Kafka-backed event pipelines (e-commerce order events)
- →Ad click aggregation (Google Ads, Meta Ads: billions of clicks/day)
Variants
Shard-and-flush (direct sharding)
Shard the database directly; each shard absorbs its write slice.
Shard by a high-cardinality key. Each shard is a full primary absorbing its slice of writes.
Direct sharding is the most intuitive write-heavy move: split the keyspace across N database primaries, route each write to the correct shard, and let each shard handle its fraction of the total load. If one Postgres primary tops out at 12K writes/sec, four shards give you roughly 48K writes/sec with linear scaling — in theory.
Poorly chosen partition key concentrates traffic on one shard while others idle. The hot shard becomes the bottleneck regardless of total cluster capacity.
The reality is messier. The shard key determines everything. A good key has high cardinality (millions of distinct values), even distribution (no single value dominates), and query alignment (the most common queries filter on the shard key, avoiding cross-shard scatter). user_id is the classic good key. country_code is a classic bad key — the US shard absorbs 40% of traffic while 150 other shards idle. timestamp is even worse for write-heavy: all current writes funnel into the single "now" partition.
When you detect skew, the fix depends on the shape. For moderate skew, composite keys work: shard on user_id + date to spread a single user's writes across days. For extreme skew (a celebrity post getting millions of likes), you need sharded counters — split the counter into N sub-keys (post:1:likes:0 through post:1:likes:15), route each increment to a random sub-key, and SUM on read. The write contention drops by N; the read cost increases by N lookups, which is almost always an acceptable trade because reads on a write-heavy path are infrequent.
Cross-shard queries are the operational tax. Any query that spans shards requires scatter-gather, which is slower and harder to paginate. The discipline is to design the schema so that 95%+ of queries are shard-local. For the remaining 5%, build a separate read model (a search index, a denormalised aggregate table) that is updated asynchronously.
Resharding — growing from N to M shards — is the hardest operational moment in a sharded system. The proven approach is dual-write: configure the application to write to both old and new shard maps, run a backfill job to copy historical data to the new map, verify consistency, then cut reads over. This is an online migration; the system stays live throughout, but it requires careful coordination and a verification step that compares rows across old and new shards.
Pros
- +Linear write scaling — add shards, add throughput
- +Each shard is a standard RDBMS — familiar tooling and operations
- +Strong consistency within each shard for transactional workloads
Cons
- −Cross-shard queries are expensive scatter-gather operations
- −Resharding is a complex online migration (dual-write + backfill + verify)
- −Shard key skew can negate the scaling benefit entirely
Choose this variant when
- Writes need strong consistency and ACID guarantees per entity
- The workload has a natural high-cardinality partition key
- Read patterns are mostly shard-local (filter on the shard key)
- The team has operational experience with relational databases
Log-first, materialise later
Append to Kafka; consumers build read-optimised views asynchronously.
All writes go to an append-only log. Consumers materialise purpose-built views: LSM store, OLAP rollup, cold archive.
The log-first variant decouples write acceptance from write processing. Producers append events to Kafka (or Kinesis, Pulsar, Redpanda) — a sequential, partitioned, durable log. Consumers read events at their own pace and materialise whatever views the query layer needs. The log is the source of truth; views are disposable projections.
Writes land in a WAL + memtable (fast, in-memory). When the memtable is full it flushes to an immutable SSTable on disk. Background compaction merges SSTables to reclaim space and bound read amplification.
The storage engine behind most write-optimised stores is the LSM tree (Log-Structured Merge tree). Cassandra, ScyllaDB, RocksDB, LevelDB, and HBase all use variants of it. The idea is elegant: writes go to an in-memory buffer (the memtable) and a sequential WAL for durability. When the memtable fills, it flushes to an immutable sorted file on disk (an SSTable). Background compaction merges SSTables to reclaim space and keep read amplification bounded.
The LSM trade-off is explicit: writes are fast (sequential append, no random I/O), reads are slower (may need to check multiple SSTables + bloom filters), and compaction consumes background CPU and I/O. For write-heavy workloads where reads are aggregate or infrequent, this trade-off is ideal. For workloads with heavy point-read traffic on recent data, you may need to pair the LSM store with a cache or a B-tree index for the hot slice.
The operational model is different from RDBMS. Schema changes are cheap (add a column to a wide-column store without locking), but data modelling is query-driven: you design tables around your access patterns, not around entity relationships. Denormalisation is the norm, not the exception. If you need a new query shape, you add a new consumer that reads the log and builds a new table — no migration, no downtime, just replay.
The failure modes are consumer-centric. Consumer lag means the read model is stale; monitor lag as a primary SLO metric and autoscale consumers on it. Poison messages block a partition; use a dead-letter queue after N retries. Hot partitions in Kafka mirror the hot-shard problem in databases: pick a partition key with high cardinality, and if no natural key exists, add a random suffix or time-slice component.
This variant shines for event-sourced systems, analytics ingestion, CDC pipelines, and any workload where the write shape (append) differs from the read shape (aggregate, time-range, or entity-centric lookup). It is the backbone of modern data platforms at companies like LinkedIn, Uber, and Netflix.
Pros
- +Write throughput limited only by Kafka partition count — practically unbounded
- +Views are disposable; replay the log to rebuild or reshape
- +Decoupled producers and consumers scale independently
- +Natural fit for event sourcing and CQRS
Cons
- −Eventual consistency between log and read model (consumer lag)
- −Operational complexity: Kafka cluster management, consumer group coordination
- −Debugging requires correlating events across log and multiple views
- −Log retention and GDPR deletion require careful engineering
Choose this variant when
- Writes are append-only events (clicks, metrics, telemetry, orders)
- Multiple consumers need different views of the same data
- Eventual consistency is acceptable (seconds of lag, not zero)
- The team is comfortable operating Kafka or a managed equivalent
Queue-and-batch (intermediate aggregation)
Buffer writes and flush in batches to amortise per-write overhead.
Batching can happen at three levels: client-side buffering, application-level micro-batching, and database group commit. Each tier trades latency for throughput.
Batching is the oldest trick in the write-heavy playbook and remains one of the most effective. The insight is that per-write overhead — network round-trip, WAL fsync, index update, replication — dominates at low batch sizes. A single Postgres INSERT takes ~1ms including commit. A batch of 100 INSERTs in a single transaction takes ~5ms total — a 20x throughput improvement from trivially grouping writes.
Batching can happen at three levels: client-side buffering, application-level micro-batching, and database group commit. Each tier trades latency for throughput.
Batching can happen at three tiers, each with different latency-throughput trade-offs:
Tier 1 — Client-side buffering. The Kafka producer's linger.ms setting is the canonical example: instead of sending each record immediately, the producer waits up to N milliseconds to accumulate a batch, then sends them all in one request. This reduces network round-trips and enables compression. The trade-off is added latency equal to the linger window. For analytics events, 100ms of linger is invisible; for payment confirmations, it is unacceptable.
Tier 2 — Application-level micro-batching. The application server accumulates writes in a local buffer and flushes every 50ms or every 200 records, whichever comes first. This is common in metrics pipelines: instead of one INSERT per metric point, the app builds a batch and does a single multi-row INSERT or COPY statement. The complexity is managing the buffer lifecycle — flush on shutdown, flush on timeout, handle partial failures.
Tier 3 — Database group commit. Postgres commit_delay and commit_siblings tell the WAL writer to wait briefly for additional commits before fsyncing, amortising the fsync cost across multiple transactions. MySQL's innodb_flush_log_at_trx_commit = 2 is similar. This is invisible to the application but requires trusting that a brief power loss window (the commit delay) is acceptable for the durability guarantees you need.
The three tiers are composable. A well-tuned write-heavy pipeline uses client-side linger (Kafka producer), application-level batching (Flink micro-batches), and database group commit (Cassandra's commitlog sync period) simultaneously. Each tier removes a multiplier from the per-write cost, and the cumulative effect is dramatic: 100x throughput improvement is routine.
The anti-pattern is unbounded batching. If the batch grows without a size or time cap, a slow consumer accumulates an enormous batch that overwhelms the database on flush. Always cap batches by both count and wall-clock time, and monitor batch sizes as a key operational metric.
Pros
- +Dramatic throughput improvement (10-100x) with minimal architectural change
- +Composable across tiers — client, app, and DB batching stack
- +Reduces network round-trips, fsync calls, and index churn
Cons
- −Adds latency equal to the batch window (milliseconds to seconds)
- −Partial batch failure requires retry or dead-letter handling
- −Unbounded batches can cause memory pressure and flush storms
Choose this variant when
- Per-write overhead (network, fsync, index) dominates throughput
- The workload tolerates milliseconds of additional latency
- You want a quick win before reaching for sharding or log-first
- The database supports efficient bulk operations (COPY, multi-row INSERT)
Hierarchical reduce (tree aggregation)
Fan-in tree of partial aggregators; only the root touches durable storage.
Edge nodes aggregate locally, mid-tier nodes merge regional results, the root produces the global answer. Each tier reduces cardinality by 10-100x.
When the write volume is so large that even Kafka partitions would be overwhelmed by per-event processing, the answer is to reduce the event count before it reaches the durable layer. Hierarchical aggregation builds a tree of stateless (or soft-state) aggregators that merge partial results at each level, reducing cardinality by 10–100x per tier.
Edge nodes aggregate locally, mid-tier nodes merge regional results, the root produces the global answer. Each tier reduces cardinality by 10-100x.
The classic example is a live viewer count for a streaming platform. During a major event, millions of viewers send heartbeat pings every 10 seconds. Writing each ping individually to a database would require millions of writes per second — far beyond what any single store can handle. Instead:
- Edge aggregators (one per PoP or data center) count heartbeats locally over a 5-second window. 100 edge nodes each seeing 10K pings/sec produce 100 partial counts every 5 seconds — a 500x reduction.
- Mid-tier aggregators (one per region) merge the edge partials. 10 mid-tier nodes produce 10 regional counts — another 10x reduction.
- The root aggregator sums the regional counts and writes a single global number to the database. The write rate to durable storage: one write every 5 seconds, regardless of the viewer count.
The properties that make aggregation composable are associativity and commutativity: SUM, COUNT, MIN, MAX, HyperLogLog unions. If your metric has these properties, you can aggregate at any level in any order and get the correct answer. Metrics that require exact ordering (median, percentile) need more sophisticated sketches (t-digest, DDSketch) that are still mergeable but approximate.
The trade-off is temporal resolution. Each aggregation window adds latency: if the edge window is 5 seconds and the mid-tier window is 10 seconds, the global count is up to 15 seconds stale. For a live viewer counter this is fine — users cannot perceive the difference between 1,234,567 and 1,234,892. For a real-time bidding system, it is not. Match the window size to the staleness budget.
Failure handling in a tree is graceful: if one edge node dies, the global count is slightly low for one window, then recovers. Compare this to a centralised counter where the single node's death means total blindness. The tree architecture is inherently fault-tolerant because each node is stateless (or holds only one window of soft state).
Pros
- +Reduces write fan-in by 100–1000x before touching durable storage
- +Inherently fault-tolerant — one node loss is a minor blip
- +Scales horizontally at every tier
Cons
- −Adds latency proportional to the aggregation window depth
- −Only works for associative, commutative metrics (SUM, COUNT, HLL)
- −Debugging requires tracing through multiple aggregation tiers
Choose this variant when
- Event rate exceeds what even Kafka can handle per-event (millions/sec)
- The metric is aggregatable (counts, sums, approximate distinct counts)
- Seconds of staleness are acceptable for the use case
- The system needs graceful degradation under partial node failure
Scaling path
v1 — Single primary, vertically tuned
Exhaust what one machine can do before adding complexity.
Vertical scaling: fast NVMe drives, group commit, WAL tuning. Good for 5K–15K writes/sec before the primary saturates.
Start with a single Postgres or MySQL primary on NVMe storage. Tune the WAL: set commit_delay to 10–50 microseconds to batch fsyncs, consider synchronous_commit = off for workloads where losing the last few transactions on crash is acceptable (metrics, analytics), and set sync_binlog=0 on MySQL for the same effect. Use bulk INSERTs or COPY instead of row-at-a-time inserts. Disable unused indexes on write-hot tables — every index is a write amplifier.
With these tunings, a single modern server (64 cores, NVMe, 256 GB RAM) sustains 10K–15K write TPS for typical row sizes. This is the right architecture for most startups and many mid-stage companies. The bottleneck that forces you to v2 is either CPU saturation on the primary, WAL throughput hitting the NVMe ceiling, or replication lag to followers becoming unacceptable.
What triggers the next iteration
- Primary CPU saturation at ~15K TPS
- WAL throughput ceiling on NVMe (~2 GB/s sustained)
- Replication lag to followers grows under sustained write load
- Index maintenance becomes the dominant cost as table grows
v2 — Horizontal sharding
Distribute writes across N primaries for linear throughput scaling.
Shard by a high-cardinality key. Each shard is a full primary absorbing its slice of writes.
When vertical tuning is exhausted, shard the database. Pick a shard key with high cardinality and even distribution — user_id for social apps, device_id for IoT, ad_id for click tracking. Hash the key modulo N to select the shard. Each shard is a full primary with its own WAL, its own replicas, and its own capacity budget.
The operational cost of sharding is real. Cross-shard queries require scatter-gather. Schema migrations must be coordinated across all shards. Resharding (changing N) requires a dual-write migration: write to both old and new shard maps, backfill historical data, verify consistency, then cut over. Plan the initial shard count for 2–3 years of growth, because resharding is the most expensive operational event in a sharded system.
Monitor per-shard write rates and latency independently. Skew — one shard absorbing disproportionate traffic — negates the scaling benefit. If you detect skew, the fix is either a better shard key or sharded counters for the hot keys.
What triggers the next iteration
- Shard key skew concentrates traffic on a single shard
- Cross-shard queries are slow and operationally complex
- Resharding requires dual-write migration with careful coordination
- Operational overhead of N independent primaries
v3 — Log-first with materialised views
Decouple write acceptance from write processing; enable multiple read models.
All writes go to an append-only log. Consumers materialise purpose-built views: LSM store, OLAP rollup, cold archive.
At this stage, writes go to Kafka partitions — append-only, horizontally scalable, with configurable retention (7 days is a common default, with archival to S3 for longer). The database is no longer in the write path; it is in the consumer path, receiving pre-processed, batched, and possibly pre-aggregated data from consumer workers.
Consumer groups materialise purpose-built views: an LSM store (Cassandra, ScyllaDB) for point lookups by entity + time, an OLAP engine (Druid, Pinot, ClickHouse) for aggregate dashboards, and S3 Parquet files for offline analytics and compliance. If a view design turns out wrong, add a new consumer and replay the Kafka topic — no migration, no downtime.
The SLO metric shifts from database latency to consumer lag: how far behind the consumer group is from the log head. Alert on lag exceeding your staleness budget (e.g., 30 seconds for a metrics dashboard). Autoscale consumers based on lag, not CPU — lag is the signal that matters.
What triggers the next iteration
- Consumer lag grows if consumer throughput < producer throughput
- Kafka partition count limits maximum parallelism (hard to increase later)
- Log retention vs storage cost vs GDPR compliance trade-off
- Debugging requires correlating events across log and multiple views
v4 — Multi-region with global replication
Accept writes in every region; replicate the log globally for reads and DR.
Each region has its own Kafka cluster. MirrorMaker replicates topics cross-region for global reads and DR.
For global-scale write-heavy systems, each region gets its own Kafka cluster. Producers write to the local cluster with single-digit-millisecond latency. MirrorMaker 2 (or Confluent Cluster Linking) replicates topics across regions asynchronously. Each region's consumers read from the local cluster, so consumer processing is also region-local.
The trade-off is cross-region consistency. Events written in US-East are visible to EU consumers after the replication lag (typically 50–200ms inter-region). For most write-heavy workloads (metrics, telemetry, clickstream), this is invisible. For workloads that require causal ordering across regions (multi-player games, collaborative editing), you need additional coordination — vector clocks, hybrid logical clocks, or a global sequencer.
The disaster recovery story is strong: if one region fails, the other regions have a near-complete copy of the log. Failover means redirecting producers to a surviving region and letting consumers catch up from the replicated topic. Data loss is bounded by the replication lag at the moment of failure — typically under one second.
What triggers the next iteration
- Cross-region replication lag limits global consistency
- MirrorMaker operational complexity and topic naming conventions
- Cost of running full Kafka clusters in every region
- Conflict resolution if the same entity is written in multiple regions
Deep dives
LSM trees vs B-trees: choosing the right storage engine
Writes land in a WAL + memtable (fast, in-memory). When the memtable is full it flushes to an immutable SSTable on disk. Background compaction merges SSTables to reclaim space and bound read amplification.
The storage engine is the single most consequential choice in a write-heavy system. B-trees (Postgres, MySQL InnoDB) and LSM trees (Cassandra, RocksDB, ScyllaDB) make opposite trade-offs, and picking the wrong one costs you an order of magnitude in throughput.
Writes land in a WAL + memtable (fast, in-memory). When the memtable is full it flushes to an immutable SSTable on disk. Background compaction merges SSTables to reclaim space and bound read amplification.
B-trees are read-optimised. Every write updates the tree in place: find the leaf page, modify it, write the dirty page back to disk. This means writes incur random I/O — seeking to the correct page, reading it, modifying it, writing it back. For a table with 10 indexes, a single row insert triggers 11 random writes (the heap page + 10 index pages). The upside is that reads are fast: one tree traversal finds the exact row, with no need to merge results from multiple files.
LSM trees are write-optimised. Writes go to an in-memory buffer (memtable) backed by a sequential WAL. No random I/O at write time — just an append to the WAL and an insert into a sorted in-memory structure. When the memtable fills (typically 64–256 MB), it flushes to an immutable SSTable file on disk. Background compaction merges overlapping SSTables to reclaim space and keep the number of files bounded.
The trade-offs are quantifiable:
- Write amplification: B-trees write each page at least twice (WAL + data file), plus index updates. LSM trees write each record once to the WAL + memtable, then again during each compaction level. Typical write amplification: B-tree ~10–30x, LSM ~10–30x (leveled compaction) or ~2–4x (tiered compaction). LSM with tiered compaction wins handily for pure append workloads.
- Read amplification: B-trees read one page per tree level (~3–4 for typical tables). LSM trees may need to check multiple SSTables across levels; bloom filters cut most false checks, but worst-case reads touch more I/O. For point lookups, B-trees are faster. For range scans on sorted data, LSM trees are competitive because SSTables are sorted.
- Space amplification: B-trees keep ~50% page utilisation after many updates. LSM trees have temporary space amplification during compaction (up to 2x for leveled). Neither is clearly better; it depends on the workload's update vs append ratio.
When to pick LSM: append-heavy workloads (timeseries, events, metrics), write throughput > 10K TPS, reads are mostly range scans or aggregate queries, you can tolerate background compaction consuming 10–20% of disk I/O. When to pick B-tree: mixed read-write workloads with heavy point lookups, need for strong transactional semantics (SERIALIZABLE isolation), operational team is experienced with Postgres or MySQL, write throughput < 10K TPS.
The hybrid approach is increasingly common: use Postgres (B-tree) for the transactional core (accounts, orders, inventory) and Cassandra or ScyllaDB (LSM) for the high-volume append path (events, metrics, audit logs). Connect them with CDC so the LSM store stays in sync without the application managing dual writes.
Sharded counters: taming the hot write key
N sub-keys absorb concurrent increments without contention. Readers SUM all sub-keys. Trade exact point-in-time accuracy for write throughput.
A hot write key is the write-heavy equivalent of a hot cache key. When millions of users like the same post, all increments target the same row — post:1:likes. The database serialises those updates, and throughput collapses to whatever one row lock can sustain (typically 1–5K updates/sec on Postgres, less on Cassandra due to read-before-write for counters).
N sub-keys absorb concurrent increments without contention. Readers SUM all sub-keys. Trade exact point-in-time accuracy for write throughput.
The sharded counter pattern spreads the write load across N sub-keys. Instead of one key post:1:likes, create N keys: post:1:likes:0 through post:1:likes:15 (for N=16). Each increment picks a random sub-key (or hashes the user ID to a sub-key for idempotency). Write contention drops by N because updates are distributed across N independent keys.
The read path sums all sub-keys: GET post:1:likes:0 through GET post:1:likes:15, then sum. This is N reads instead of one, but for write-heavy counters, this trade-off is almost always correct — the write bottleneck is the limiting factor, and N reads are cheap when N is small (16–64).
Choosing N. Start with N = number of database shards or Redis nodes, so each sub-key naturally lands on a different node. If you are on a single node, N = 16 is a reasonable starting point — it reduces contention by 16x while keeping the read fan-out manageable. Monitor per-sub-key write rates; if one sub-key is still hot (because of hash collisions), increase N.
Idempotent increments. If you need to prevent double-counting (a user liking a post twice), the sub-key selection must be deterministic: hash(user_id) % N selects the sub-key, and the sub-key stores a set of user IDs that have already incremented. This is more expensive (read-modify-write instead of blind increment) but necessary for correctness on engagement metrics.
Periodic compaction. Over time, you can compact the sharded counter back into a single value. A background job reads all sub-keys, sums them, writes the total to a post:1:likes:total key, and resets the sub-keys. This makes reads cheaper (one key instead of N) at the cost of a brief inconsistency window during compaction. Run compaction during off-peak hours.
Twitter uses this pattern for like and retweet counts, with N=16 sub-keys per tweet. At 300K+ likes per second during peak events, a single counter row would be a permanent bottleneck. The sharded counter reduces contention to ~19K writes per sub-key per second, well within what a single Cassandra partition can handle.
Online resharding without downtime
App writes to both old and new shard maps simultaneously. A backfill job copies historical data. A verifier confirms consistency before cutover.
Resharding — changing the number of shards from N to M — is the most operationally complex event in a sharded database's lifecycle. The naive approach (stop writes, rehash all data, restart) requires downtime proportional to data size, which is unacceptable for any production system doing thousands of writes per second.
App writes to both old and new shard maps simultaneously. A backfill job copies historical data. A verifier confirms consistency before cutover.
The online approach has four phases:
Phase 1 — Prepare. Provision the new shard set (M shards). Configure the application to compute both old and new shard assignments for every write, but continue writing only to the old shards. Verify that the new shard assignment logic is correct by shadow-logging where each write would go.
Phase 2 — Dual-write. Enable dual-write: every new write goes to both the old shard (using the old hash function) and the new shard (using the new hash function). Reads still come from the old shards. This phase ensures that all new data exists in both shard sets. The write throughput cost roughly doubles, but it is temporary.
Phase 3 — Backfill. A background job scans the old shards and copies historical data to the new shards. This runs at throttled throughput to avoid impacting production writes. For a 10TB dataset at 100MB/sec throttled, the backfill takes roughly 28 hours. During this phase, dual-write continues, so any data modified during the backfill is handled by the dual-write path.
Phase 4 — Verify and cutover. A verification job compares rows between old and new shards (sample-based for large datasets, full-scan for small ones). Once verification passes, cut reads to the new shards. After a soak period (24–72 hours), disable dual-write and decommission the old shards.
The critical subtlety is ordering during dual-write. If a row is updated twice in quick succession, the old shard sees update A then B, but the new shard might see B then A due to network timing. The backfill job handles this by comparing the final state (not the update sequence) and copying the latest version. For workloads with true concurrent updates to the same row, use a last-write-wins timestamp or a conflict resolution function.
Vitess, Citus, and CockroachDB automate parts of this process, but understanding the four phases is essential for interviews because interviewers frequently ask "what happens when you outgrow your shard count?" and expect the dual-write answer.
Load shedding: protecting the write path under overload
Admission controller classifies traffic by priority. During overload, low-priority requests are dropped immediately with a counter for observability. High-priority traffic proceeds to the queue.
Load shedding is the deliberate dropping of low-priority traffic to protect the system's ability to process high-priority traffic. In a write-heavy system, the write path is the bottleneck, and shedding is the last-resort defence when batching, sharding, and queuing are all saturated.
Admission controller classifies traffic by priority. During overload, low-priority requests are dropped immediately with a counter for observability. High-priority traffic proceeds to the queue.
The key insight is priority classification. Not all writes are equal. In an ad-click pipeline, a billable click event is critical (it affects revenue reconciliation), while a debug-level impression event is best-effort. In a metrics pipeline, P0 alerts data is critical, while P2 dashboard data can be dropped for 60 seconds without anyone noticing. Classify your write traffic into 2–3 priority tiers before you need shedding, because doing it during an incident is too late.
Admission control strategies:
- 1Queue depth threshold. Monitor the depth of the write queue (Kafka consumer lag, in-memory buffer size). When it exceeds a threshold, reject new writes for the lowest priority tier. Simple to implement, but the threshold requires tuning — too low and you shed unnecessarily, too high and you shed too late.
- 1CoDel (Controlled Delay). Track the time each write spends in the queue. If recent items are consistently waiting longer than a target (e.g., 50ms), start dropping. CoDel adapts to sustained overload and ignores brief bursts, making it more robust than fixed thresholds.
- 1Token bucket per priority. Each priority tier has its own token bucket with a configured rate and burst. High-priority writes get large buckets (effectively unlimited under normal load). Low-priority writes get smaller buckets that drain quickly under overload. This gives you fine-grained control over the relative admission rates.
- 1Adaptive shedding with feedback. Monitor the downstream system's latency and error rate. When latency exceeds the target, reduce the admission rate for low-priority traffic proportionally. When latency recovers, gradually re-admit. This is the most sophisticated approach and is used by Google's Doorman and Uber's QALM.
What to do with shed traffic. Never silently drop. Log the dropped event count and type to a shedding counter. For recoverable shedding, write the dropped events to a cold-path store (S3, a separate low-priority Kafka topic) for later replay. For non-recoverable shedding (real-time metrics where stale data is useless), count and move on. The shedding counter is a critical operational metric — if it fires continuously, you need more capacity, not more shedding.
Batching: where, when, and how much
Batching can happen at three levels: client-side buffering, application-level micro-batching, and database group commit. Each tier trades latency for throughput.
Batching is deceptively simple in concept — group writes and flush together — but the implementation details determine whether you get a 100x throughput improvement or a system that occasionally loses data and always has unpredictable latency.
Batching can happen at three levels: client-side buffering, application-level micro-batching, and database group commit. Each tier trades latency for throughput.
Where to batch. The three tiers (client, application, database) are not alternatives; they are composable layers. A well-tuned pipeline uses all three:
- Client (Kafka producer
linger.ms=50,batch.size=64KB): reduces network round-trips from producer to broker. - Application (micro-batch every 100ms or 500 records): converts per-record INSERT into multi-row COPY/INSERT.
- Database (Postgres
commit_delay=10us,commit_siblings=5): amortises WAL fsync across concurrent transactions.
Each tier reduces the effective cost per write by a different multiplier. Client batching saves network. App batching saves SQL parsing and index updates. DB batching saves fsync calls. Together they compound.
When to flush. Every batch needs two triggers — a time cap and a size cap — and the flush fires on whichever comes first. Time cap alone risks tiny batches under low load (wasted flushes) and enormous batches under high load (memory pressure). Size cap alone risks unbounded latency under low load (the batch never fills). Both together give predictable latency (bounded by the time cap) and predictable memory (bounded by the size cap).
Typical values for a metrics pipeline: flush every 200ms or 1000 records. For a Kafka consumer writing to Cassandra: flush every 500ms or 5000 records. For a real-time dashboard: flush every 50ms or 100 records. The values depend on the downstream system's optimal batch size and the staleness budget.
Error handling. When a batch write fails (network timeout, constraint violation, partial failure), you have three options:
- 1Retry the entire batch. Simple but risks duplicates unless the writes are idempotent.
- 2Binary-search the failure. Split the batch in half, retry each half, repeat. Isolates the bad record in O(log N) retries. Used when one poison record in a batch of 1000 should not block the other 999.
- 3Dead-letter the batch. Write the entire failed batch to a DLQ for manual inspection. Used when the failure is systemic (schema mismatch, downstream outage) and retrying is pointless.
The `commit_delay` trick. Postgres's commit_delay is underappreciated. It tells the WAL writer to wait N microseconds after the first commit in a group before fsyncing, hoping more commits arrive in that window. With commit_delay=10 and commit_siblings=5 (only delay if 5+ transactions are in-flight), a heavily concurrent write workload sees 3–5x throughput improvement at the cost of 10 microseconds of added latency per commit. This is invisible to the application and requires zero code changes — just a GUC setting.
Hierarchical aggregation: building the fan-in tree
Edge nodes aggregate locally, mid-tier nodes merge regional results, the root produces the global answer. Each tier reduces cardinality by 10-100x.
Hierarchical aggregation is the answer to the question "how does YouTube show a live viewer count when 50 million people are watching simultaneously?" The answer is: it does not count 50 million pings. It builds a tree that reduces 50 million pings to one write.
Edge nodes aggregate locally, mid-tier nodes merge regional results, the root produces the global answer. Each tier reduces cardinality by 10-100x.
The tree structure. At the leaves, edge aggregators sit close to users (one per PoP, one per data center, or one per server). Each edge node maintains an in-memory counter and flushes a partial count upstream every W seconds (the window size). Mid-tier aggregators (one per region or availability zone) receive partial counts from their children and produce a regional total. The root aggregator sums the regional totals and writes the global count to a durable store.
Cardinality reduction. Suppose 100 edge nodes each receive 10,000 heartbeats per second. Without aggregation, the durable store sees 1,000,000 writes/sec. With a 5-second edge window, each edge node flushes one partial count per window, producing 100 writes per 5 seconds = 20 writes/sec to the mid tier. With 10 mid-tier nodes and a 10-second window, the root sees 10 writes per 10 seconds = 1 write/sec. The reduction factor is (event_rate × window) / nodes_per_tier, compounded across tiers.
Mergeable data structures. The tree works because the aggregation operation is associative and commutative. COUNT and SUM are trivially mergeable. For approximate distinct counts, HyperLogLog sketches are mergeable — each node maintains a local HLL, and the parent merges child HLLs into a combined estimate. For percentiles, t-digest or DDSketch sketches are mergeable with bounded error. For exact distinct counts, you would need to pass the full set upstream, which defeats the purpose — use an approximate structure instead.
Window alignment. Edge windows do not need to be synchronized. If edge A flushes at t=5s and edge B flushes at t=7s, the mid-tier node simply accumulates whatever partials it receives and flushes on its own window. The global count is a moving average with a staleness bound of (sum of all window sizes). For a three-tier tree with 5s/10s/10s windows, the global count lags reality by at most 25 seconds. This is acceptable for viewer counts, ad impression dashboards, and most operational metrics.
Failure handling. If an edge node dies, its partial count is simply missing from the next mid-tier flush. The global count dips slightly for one window and then recovers when the edge restarts (or its traffic is redistributed to other edges). No data loss in the durable store, no cascade failure, no manual intervention. Compare this to a centralised counter: if the single counter node dies, you have zero visibility until it recovers. The tree architecture trades exact accuracy for resilience, which is the correct trade-off for all but financial accounting.
Implementation pattern. The simplest implementation uses Kafka topics as the communication layer between tiers. Edge nodes produce to a counts-edge topic (partitioned by region). Mid-tier Flink jobs consume, window-aggregate, and produce to a counts-regional topic. The root Flink job consumes regional counts and writes the global aggregate. Each tier is independently scalable, and Kafka handles backpressure and durability between tiers.
Case studies
Discord messages: MongoDB → Cassandra → ScyllaDB
Discord stores trillions of messages across millions of servers. The journey through three storage engines illustrates every write-heavy lesson in one company's history.
Phase 1 — MongoDB. Early Discord used MongoDB with a single replica set. At a few thousand messages per second, this worked fine. The B-tree storage engine handled the write load, and the working set fit in RAM for fast reads. The breaking point came around 100 million stored messages: the indexes no longer fit in memory, write latency became erratic due to lock contention on the WiredTiger storage engine, and compaction pauses caused visible latency spikes for users.
Phase 2 — Cassandra. Discord migrated to Cassandra, an LSM-based distributed database designed for write-heavy workloads. Messages were partitioned by (channel_id, bucket) where a bucket is a time window (e.g., 10 days). This gave good write distribution — active channels spread writes across many partitions — and efficient range reads for "load the last 50 messages in this channel."
Cassandra handled the write throughput beautifully (hundreds of thousands of messages per second with linear scaling), but two problems emerged at scale. First, garbage collection pauses in the JVM caused tail latency spikes. A 200ms GC pause on one node meant that all reads routed to that node stalled. Second, compaction resource competition became severe: during heavy compaction, read latency doubled because compaction consumed disk I/O that reads needed.
Phase 3 — ScyllaDB. Discord migrated to ScyllaDB, a C++ rewrite of Cassandra that eliminates JVM GC pauses and provides more predictable performance under compaction. The data model stayed the same (partition by channel + time bucket, LSM storage), but the p99 read latency dropped by 5x and write throughput increased by 3x on the same hardware.
The key takeaways: (1) LSM storage is the right engine for append-heavy message workloads, (2) the JVM's garbage collector is a liability at extreme scale, (3) the partition key (channel_id + time_bucket) matters more than the storage engine choice, and (4) migrating storage engines with the same data model is feasible if you designed the data model correctly in the first place.
Takeaway
LSM storage engines win for append-heavy workloads, but JVM GC becomes a liability at extreme scale — ScyllaDB (C++) eliminated the tail latency problem that Cassandra could not.
Uber GPS pings: 1M+ events/sec ingestion
Every active Uber driver sends a GPS ping every 4 seconds. During peak hours across global markets, this produces over 1 million GPS events per second — a pure write-heavy workload with real-time read requirements (the rider app needs to see the driver's current location with sub-second freshness).
Uber's architecture for this is a textbook log-first pipeline. GPS pings flow from the driver app to edge collectors (one per region), then into Kafka topics partitioned by city and driver zone. Kafka provides the durable log and decouples ingestion from processing.
Two consumer groups read the Kafka topics:
- 1Real-time location service. Consumes GPS pings with minimal latency (target: < 1 second from driver phone to rider phone). Writes the latest location to a Redis cluster keyed by driver_id. The rider app polls this Redis cluster every 2 seconds. This is the "hot" path — freshness matters more than historical completeness.
- 1Historical storage. Consumes the same Kafka topic at a slightly lower priority and writes to a Cassandra cluster partitioned by (driver_id, date). This data feeds ETA models, route optimization, fraud detection, and regulatory compliance. Eventual consistency is fine — the data can lag by 30 seconds without affecting any downstream consumer.
The scale numbers are instructive: 1M events/sec × 200 bytes/event = 200 MB/sec of raw ingestion. With Kafka replication factor 3, that is 600 MB/sec of disk I/O across the Kafka cluster. With 3 consumer groups (real-time, historical, analytics), total read throughput from Kafka is 1.8 GB/sec. This is within the capacity of a 20-node Kafka cluster with NVMe drives.
The lesson for interviews: separate the hot path (low-latency, latest value, Redis) from the cold path (historical, range queries, Cassandra). Do not try to serve both from the same store. The hot path is effectively a cache that is continuously refreshed by the consumer; the cold path is the authoritative historical record. This dual-store pattern appears in every large-scale write-heavy system.
Takeaway
Separate hot path (Redis for latest location, sub-second freshness) from cold path (Cassandra for historical queries) — do not try to serve both from one store.
Twitter sharded counters: 300K+ likes/sec
When a viral tweet from a celebrity accumulates millions of likes in minutes, the naive approach — a single row in a counter table — collapses under write contention. Twitter's solution is the sharded counter pattern, deployed at a scale that makes it one of the cleanest production examples of the technique.
Each tweet's engagement counters (likes, retweets, quote tweets, replies) are sharded across 16 sub-keys. For a tweet with ID 123, the like counter sub-keys are tweet:123:likes:0 through tweet:123:likes:15. When a user likes the tweet, the application hashes the user's ID modulo 16 to select a sub-key and issues a blind INCREMENT on that sub-key. Using the user ID as the hash input also provides natural idempotency — the same user always hits the same sub-key, making duplicate detection a local check against a set stored alongside the counter.
Write path. The user taps "like" → the app server computes hash(user_id) % 16 → issues INCR tweet:123:likes:{shard} to the Cassandra cluster. The write is fast (single-partition, no read-before-write for the INCR) and horizontally scalable (the 16 sub-keys naturally distribute across multiple Cassandra nodes via consistent hashing).
Read path. When rendering a tweet, the app server reads all 16 sub-keys in parallel: GET tweet:123:likes:0 through GET:123:likes:15, sums them, and displays the total. 16 parallel reads at Cassandra's sub-millisecond p50 latency complete in under 2ms — fast enough for the rendering path. For the timeline view (which shows hundreds of tweets), the counts are cached in a Redis layer with 30-second TTL to avoid issuing 16 × hundreds of reads per timeline load.
At peak, Twitter sustains over 300,000 like events per second during global events (World Cup finals, major celebrity moments). Without sharding, a single counter row would need to handle 300K writes/sec — far beyond what any single partition can sustain. With 16 shards, each sub-key absorbs ~19K writes/sec, comfortably within Cassandra's per-partition throughput ceiling.
The technique generalises: any counter that can receive more writes per second than a single row or partition can handle benefits from sharding. The N value (number of sub-keys) should be tuned to the expected peak write rate divided by the per-partition write ceiling, with a 2x safety margin.
Takeaway
Sharded counters (N=16 sub-keys, hash on user_id for idempotency) reduce per-partition contention by 16x — the difference between a 300K likes/sec system that works and one that melts.
Decision levers
Log retention
Short retention (hours) minimises storage but makes view rebuild impossible. Long retention (7–14 days) enables replay and reshaping. Beyond that, archive to S3 in Parquet format for compliance and offline analytics. Most production systems land at 7–14 days with explicit archival.
Consumer parallelism
Kafka partitions are the unit of parallelism. More partitions = more parallel consumers = more throughput, but also more coordination cost and more rebalance pain. Plan partition count for 2–5 years of growth since increasing it later requires topic recreation.
Exactly-once vs at-least-once
Exactly-once is real in Kafka (transactional producer + isolation level) but adds operational complexity. At-least-once with idempotent consumers is simpler, more robust, and sufficient for 95% of cases. Design consumers to be idempotent by default.
Shard count and key
Initial shard count should cover 2–3 years of growth. The shard key must have high cardinality and even distribution. Monitor per-shard write rates — skew above 2x the mean signals a bad key or the need for sharded counters on hot entities.
Batch window size
The batch window is the latency you trade for throughput. Always set both a time cap (milliseconds) and a size cap (record count). Flush on whichever triggers first. Typical values: 50–500ms time cap, 100–5000 records size cap, depending on downstream batch-write efficiency.
Failure modes
Producer throughput exceeds consumer throughput; lag grows unbounded; the read model gets arbitrarily stale. Fix: autoscale consumers on the lag metric, partition the work further, apply backpressure to producers via rate limiting.
One malformed or schema-incompatible message blocks its partition forever because the consumer retries indefinitely. Fix: dead-letter queue after N retries; alert on DLQ depth; never commit the offset past an unhandled failure.
Partition key skew sends 80% of traffic to one shard or partition; that node cannot keep up. Fix: composite key with time or random suffix for high-cardinality distribution; sharded counters for hot entities; monitor per-partition lag.
Multiple SSTables trigger compaction simultaneously, consuming all disk I/O bandwidth. Read and write latency spike together. Fix: rate-limit compaction I/O, use tiered compaction for write-heavy workloads, provision separate disks for compaction.
All application instances flush their batches simultaneously (e.g., on a synchronised timer). The database sees a burst equal to the full unbatched write rate. Fix: jitter the flush timer by a random offset per instance; cap batch size to prevent enormous flushes.
During the dual-write phase of resharding, writes to old and new shards can arrive out of order, producing divergent state. Fix: use last-write-wins with a monotonic timestamp; run a verification job that compares final state across shard sets before cutover.
Kafka retention grows without bound because no one set a policy. Storage costs escalate and GDPR deletion becomes impossible. Fix: set retention policy at topic creation (7–14 days), archive to S3/GCS for long-term, implement tombstone-based deletion for GDPR.
Decision table
Write-heavy variant comparison
| Dimension | Shard-and-flush | Log-first | Queue-and-batch | Hierarchical reduce |
|---|---|---|---|---|
| Throughput ceiling | ~100K TPS (N shards) | Practically unbounded (Kafka partitions) | 10-100x over baseline | Millions/sec (tree fan-in) |
| Consistency | Strong per shard | Eventual (consumer lag) | Eventual (flush delay) | Eventual (window delay) |
| Operational cost | High (resharding, cross-shard) | High (Kafka ops, consumer groups) | Low (config + buffer mgmt) | Medium (multi-tier deployment) |
| Latency impact | None (synchronous writes) | Low (async consumers) | Added = batch window | Added = sum of window sizes |
| Best for | ACID per entity | Event streams, CQRS | Quick throughput wins | Extreme fan-in (counters, metrics) |
- Variants are composable: a production system often uses sharding + log-first + batching simultaneously.
- Start with batching (lowest cost) and progress to log-first or sharding as throughput demands grow.
Worked example
Worked Example: Ad Click Aggregator
Prompt. "Design a system that ingests ad click events from web and mobile clients, provides real-time click counts per ad campaign for advertiser dashboards, and produces end-of-day reconciliation reports for billing."
Step 1 — Characterise the workload
Ad click volume for a mid-to-large ad network: 100K–500K clicks per second at peak, with 10x daily variation (peak during evening hours). Each click event is ~300 bytes (ad_id, campaign_id, user_id, timestamp, IP, user_agent, page_url). This is 30–150 MB/sec of raw ingestion. Reads are aggregate (clicks per campaign per minute for dashboards) and batch (full-day reconciliation for billing). This is a textbook write-heavy workload with a CQRS split: the write path is high-throughput append, the read path is aggregate queries on materialised views.
Browsers emit click events to edge collectors. Events flow through Kafka into Flink for windowed aggregation, then land in Druid for real-time dashboards and S3 for reconciliation.
Step 2 — Ingestion path
Clicks arrive from browsers (via a tracking pixel or JS beacon) and mobile SDKs. An edge collector fleet (stateless HTTP endpoints behind a global load balancer) receives the clicks, validates the schema, deduplicates by (click_id) using a Bloom filter (false positive rate ~0.1%, acceptable for click dedup), and publishes to a Kafka topic ad-clicks partitioned by campaign_id.
Why campaign_id as the partition key: it provides good cardinality (millions of campaigns), even distribution (no single campaign dominates for long), and query alignment (the primary dashboard query is "clicks per campaign per minute"). Partitioning by campaign_id means all clicks for a campaign land on the same partition, enabling partition-local aggregation in the stream processor.
Kafka configuration: 128 partitions (supports up to 128 parallel consumers), replication factor 3, retention 7 days, compression lz4 (2–3x reduction on click payloads). At 500K events/sec × 300 bytes × RF3, the Kafka cluster sustains ~450 MB/sec of disk write throughput — a 12-node cluster with NVMe drives handles this comfortably.
Step 3 — Real-time aggregation
A Flink job consumes the ad-clicks topic and performs windowed aggregation: 1-minute tumbling windows, grouped by (campaign_id, ad_id). The output is a stream of (campaign_id, ad_id, window_start, click_count, unique_users_hll) records — one record per ad per minute. This reduces the cardinality from 500K events/sec to ~100K aggregated records/minute (assuming 100K active ads), a 300x reduction.
The aggregated records are written to Druid, configured with a 1-minute segment granularity. Druid's columnar storage and bitmap indexes make sub-second dashboard queries possible: "show me clicks per minute for campaign X over the last 24 hours" is a single Druid query that scans pre-aggregated segments.
For the unique user count, Flink maintains a HyperLogLog sketch per (campaign_id, ad_id, window). The sketch is serialised into the Druid record, and Druid can merge sketches across time windows for "unique users in the last hour" queries. Approximate distinct count with ~2% error is acceptable for dashboards; exact counts come from the reconciliation path.
Step 4 — Reconciliation path
A second Flink job (or a separate consumer group) reads the same ad-clicks topic and writes raw events to S3 in Parquet format, partitioned by date and campaign_id: s3://clicks/date=2026-04-27/campaign=abc123/part-00001.parquet. A daily Spark job reads the Parquet files, computes exact aggregates (total clicks, unique users via exact HLL merge, click-through rate), and writes the reconciliation report to a billing database.
The reconciliation report is the source of truth for billing. The real-time dashboard (Druid) may differ by up to 1 minute of lag plus the approximation error of HyperLogLog. The billing report is exact and auditable.
Step 5 — Fraud detection sidecar
A third consumer group feeds a fraud-detection streaming job that applies rules: more than 10 clicks from the same IP on the same ad in 60 seconds, clicks from known bot user-agents, clicks with impossible geographic transitions. Flagged clicks are tagged in the reconciliation dataset and excluded from billing.
Step 6 — Failure modes and SLOs
- Ingestion SLO: 99.9% of clicks ingested within 5 seconds of occurrence. Measured by comparing edge collector receipt timestamp to Kafka produce timestamp.
- Dashboard SLO: aggregates within 2 minutes of real-time. Measured by Flink consumer lag + Druid ingestion lag.
- Reconciliation SLO: daily report available by 06:00 UTC for the previous day. Measured by Spark job completion time.
- Key failure mode: Kafka consumer lag on the Flink aggregation job. If lag exceeds 5 minutes, the dashboard is useless. Autoscale Flink task managers based on lag, not CPU.
- Key failure mode: S3 write failures in the reconciliation path. Flink checkpoints ensure at-least-once delivery; Parquet files are idempotent by (click_id, partition), so retries do not produce duplicates.
Interview playbook
When it comes up
- The prompt mentions high write volume: "millions of events per second", "ingestion", "clickstream"
- The interviewer says "writes are the bottleneck" or "reads are secondary"
- You recognise an append-only shape: logs, metrics, telemetry, timeseries, clicks
- The ratio of writes to reads is 1:1 or higher (contrast with read-heavy at 10:1+ reads)
Order of reveal
- 11. Name the pattern. This is a write-heavy workload — writes dominate the design. I will shape the architecture around absorbing writes first and serving reads from materialised views.
- 22. Quantify the write rate. Let me estimate the write QPS. [Do the math: users × actions/sec × event size.] At this rate, a single OLTP primary will not survive — I need to spread or defer the writes.
- 33. Choose the write-path shape. I will use a log-first architecture: writes go to Kafka, consumers materialise purpose-built views. The log is the source of truth; views are disposable.
- 44. Name the storage engine. For the write-optimised store, I will use an LSM-based engine (Cassandra or ScyllaDB) — sequential writes, no random I/O, background compaction.
- 55. Materialised views. Three consumer groups: one writes to the LSM store for point lookups, one feeds an OLAP engine for dashboards, one archives to S3 for compliance. Different queries, different stores.
- 66. Address hot keys. If any entity gets disproportionate writes (celebrity post, viral ad), I use sharded counters — N sub-keys, random or hash-based selection, SUM on read.
- 77. Failure modes. The key SLO is consumer lag, not database latency. I alert on lag, autoscale consumers on lag, and dead-letter poison messages after N retries.
Signature phrases
- “Log first, materialise later” — Names the architectural backbone and signals you know CQRS.
- “LSM over B-tree for this workload” — Shows you understand storage engine trade-offs, not just database brands.
- “Consumer lag is the SLO, not database latency” — Shifts the conversation from traditional DB metrics to the metric that actually matters.
- “Views are disposable — I can replay and rebuild” — Demonstrates confidence in the log-as-source-of-truth model.
- “Sharded counter with N sub-keys” — Immediately solves the hot-key problem that trips up most candidates.
- “Write amplification is the hidden multiplier” — Shows you think about physical I/O, not just logical operations.
Likely follow-ups
?“What if you need the read model to be consistent with the latest write?”Reveal
Then I am not truly write-heavy in the CQRS sense — I need synchronous writes to a strongly-consistent store. I would shard the database directly (variant: shard-and-flush) and accept the cross-shard query cost. Or I would use a synchronous write-through path for the entity that needs consistency and keep the async path for everything else.
?“How do you handle GDPR deletion in a log-based system?”Reveal
Two approaches. (1) Crypto-shredding: encrypt each user's data with a per-user key stored in a key management service; to "delete" a user, delete their key. The data is still in the log but unreadable. (2) Tombstone events: publish a deletion event to the log; consumers process it by deleting from their views; Kafka log compaction eventually removes the original events. Crypto-shredding is cleaner and does not require replaying consumers.
?“How do you size the Kafka cluster?”Reveal
Back-of-envelope: event_rate × event_size × replication_factor = disk write throughput. Add consumer read throughput (each consumer group reads the full topic). For 500K events/sec × 300B × RF3 = 450 MB/sec writes + 3 consumer groups × 150 MB/sec reads = 900 MB/sec total I/O. A 12-node cluster with NVMe drives and 10 Gbps network handles this. Partition count: at least 2× the expected max consumer parallelism; 128 is a safe starting point for most topics.
?“When does this pattern break down?”Reveal
Three cases. (1) When every write must be immediately readable — eventual consistency is a non-negotiable deal-breaker (financial postings, inventory reservations). (2) When the write volume fits comfortably on one tuned primary — the complexity of Kafka + consumers is not justified. (3) When writes require complex transactions across multiple entities — log-first works for independent events but struggles with multi-entity ACID.
?“How do you monitor this system?”Reveal
Four key metrics. (1) Producer-to-broker latency: if producers are slow to publish, the edge is the bottleneck. (2) Consumer lag per consumer group: this is the primary SLO metric; alert if lag exceeds the staleness budget. (3) Per-partition lag: identifies hot partitions or slow consumers. (4) Dead-letter queue depth: rising DLQ means poison messages or schema drift.
?“How would you migrate from a sharded OLTP design to a log-first design?”Reveal
CDC (Change Data Capture) is the bridge. Set up Debezium or a native CDC connector on each shard, publishing row changes to Kafka topics. Build new consumers that materialise the views from the CDC stream. Once the new views are verified, redirect reads to them. Then, over time, redirect writes to go to Kafka directly (producers write events) instead of through the OLTP shards. The shards become a legacy read store that you eventually decommission.
Code snippets
import random
import redis
NUM_SHARDS = 16
def increment_counter(r: redis.Redis, entity_id: str, user_id: str) -> None:
"""Increment a sharded counter. Shard selection is deterministic
on user_id for natural idempotency."""
shard = hash(user_id) % NUM_SHARDS
key = f"{entity_id}:likes:{shard}"
r.incr(key)
def read_counter(r: redis.Redis, entity_id: str) -> int:
"""Sum all shards to get the total count."""
pipe = r.pipeline()
for i in range(NUM_SHARDS):
pipe.get(f"{entity_id}:likes:{i}")
values = pipe.execute()
return sum(int(v or 0) for v in values)import time
import threading
from typing import List, Any
class MicroBatcher:
"""Accumulates writes and flushes on size cap or time cap,
whichever comes first."""
def __init__(self, flush_fn, max_size=500, max_wait_ms=200):
self.flush_fn = flush_fn
self.max_size = max_size
self.max_wait = max_wait_ms / 1000.0
self.buffer: List[Any] = []
self.lock = threading.Lock()
self.last_flush = time.monotonic()
self._start_timer()
def add(self, item: Any) -> None:
with self.lock:
self.buffer.append(item)
if len(self.buffer) >= self.max_size:
self._flush()
def _flush(self) -> None:
if not self.buffer:
return
batch, self.buffer = self.buffer, []
self.last_flush = time.monotonic()
self.flush_fn(batch)
def _start_timer(self) -> None:
def tick():
while True:
time.sleep(self.max_wait)
with self.lock:
if time.monotonic() - self.last_flush >= self.max_wait:
self._flush()
t = threading.Thread(target=tick, daemon=True)
t.start()from enum import IntEnum
from collections import defaultdict
import time
class Priority(IntEnum):
CRITICAL = 0 # billable events
NORMAL = 1 # user-facing analytics
BEST_EFFORT = 2 # debug telemetry
class LoadShedder:
"""Token-bucket shedder with per-priority limits."""
def __init__(self, rates: dict[Priority, float]):
self.rates = rates # tokens per second per priority
self.tokens: dict[Priority, float] = {p: r for p, r in rates.items()}
self.last_refill = time.monotonic()
self.shed_count: dict[Priority, int] = defaultdict(int)
def admit(self, priority: Priority) -> bool:
self._refill()
if self.tokens[priority] >= 1.0:
self.tokens[priority] -= 1.0
return True
self.shed_count[priority] += 1
return False
def _refill(self) -> None:
now = time.monotonic()
elapsed = now - self.last_refill
self.last_refill = now
for p, rate in self.rates.items():
self.tokens[p] = min(self.tokens[p] + elapsed * rate, rate * 2)from kafka import KafkaConsumer, KafkaProducer
import json
MAX_RETRIES = 3
def consume_with_dlq(topic: str, dlq_topic: str, process_fn) -> None:
consumer = KafkaConsumer(
topic,
group_id="my-consumer-group",
auto_offset_reset="earliest",
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode()),
)
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode()
)
for msg in consumer:
retries = 0
while retries < MAX_RETRIES:
try:
process_fn(msg.value)
consumer.commit()
break
except Exception as e:
retries += 1
if retries >= MAX_RETRIES:
producer.send(dlq_topic, value={
"original_topic": topic,
"partition": msg.partition,
"offset": msg.offset,
"value": msg.value,
"error": str(e),
})
producer.flush()
consumer.commit() # move past poison message-- WAL tuning for write-heavy Postgres
ALTER SYSTEM SET commit_delay = 10; -- microseconds; batch WAL fsyncs
ALTER SYSTEM SET commit_siblings = 5; -- only delay if 5+ txns in-flight
ALTER SYSTEM SET synchronous_commit = off; -- async commit (risk: last ~10ms on crash)
ALTER SYSTEM SET wal_buffers = '64MB'; -- larger WAL buffer for burst writes
ALTER SYSTEM SET checkpoint_completion_target = 0.9; -- spread checkpoint I/O
ALTER SYSTEM SET max_wal_size = '4GB'; -- less frequent checkpoints
-- Drop unused indexes on write-hot tables
-- Every index is a write amplifier: 1 row insert = 1 + N_indexes writes
DROP INDEX IF EXISTS idx_events_rarely_queried_column;
-- Use COPY for bulk inserts instead of row-at-a-time INSERT
-- COPY events FROM STDIN WITH (FORMAT csv);
SELECT pg_reload_conf(); -- apply without restartDrills
Your Kafka consumer lag is growing at 10K offsets per minute and not recovering. Walk through the diagnosis.Reveal
Three things to check in order. (1) Consumer throughput vs producer rate: compare the consumer's processing rate (offsets/sec) against the producer's publish rate. If the producer is faster, you need more consumer instances — up to the partition count. (2) Per-partition lag: one partition lagging while others are fine indicates a hot partition (key skew) or a slow consumer task. Rebalance or fix the partition key. (3) Processing bottleneck: if the consumer is CPU-bound (deserialization, transformation) or I/O-bound (writing to the downstream store), the fix is different. CPU-bound: optimise the processing logic or add consumers. I/O-bound: batch writes to the downstream store, or switch to a faster store. If lag is unbounded and autoscaling is maxed, the last resort is backpressure: rate-limit the producer and accept temporary data loss or queueing upstream.
You are sharding by user_id, but one user (a celebrity with 50M followers) generates 100x the writes of a normal user. What do you do?Reveal
The shard containing that user's writes is the bottleneck. Three options. (1) Sharded counters: split the celebrity's engagement counters into N sub-keys and distribute increments across them. This is the standard fix for hot counters. (2) Composite shard key: add a time component or random suffix to the shard key for the hot entity, spreading its writes across multiple shards. Reads become scatter-gather for that entity. (3) Dedicated shard: give the celebrity their own shard (or set of shards). This is operational complexity but isolates the hot entity from normal users. In practice, option (1) handles 90% of celebrity hotspot problems; option (3) is reserved for extreme cases like a global live event.
Explain when you would choose B-tree storage (Postgres) over LSM storage (Cassandra) for a write-heavy workload.Reveal
Two scenarios. (1) Transactional consistency: if writes require multi-row ACID transactions (debits and credits, inventory reservations with rollback), B-tree databases with SERIALIZABLE isolation are the right choice. LSM stores trade consistency for throughput. (2) Mixed read-write with heavy point lookups: if reads are as frequent as writes and require fast point lookups on the primary key, B-tree's in-place updates and single-page reads win. LSM's multi-level lookups with bloom filters add latency on reads. The threshold: if write TPS < 10K and you need strong transactions, B-tree. If write TPS > 10K and reads are aggregate or can tolerate eventual consistency, LSM.
A batch flush fails halfway — 500 of 1000 records were written. How do you recover?Reveal
Depends on idempotency. If writes are idempotent (INSERT ON CONFLICT DO NOTHING, or UPSERT with a unique event ID), retry the entire batch — the 500 already-written records are no-ops, and the remaining 500 succeed. This is the simplest and most robust approach. If writes are NOT idempotent (plain INSERT that would create duplicates), use binary search: split the batch in half, retry each half, recurse on failures. This isolates the failing record(s) in O(log N) attempts. For systematic failures (downstream outage, schema mismatch), do not retry — dead-letter the entire batch for manual inspection and move on. Always log the batch boundaries (first and last offset) so you can audit what was written.
Your write-heavy system needs to comply with GDPR right-to-deletion. The source of truth is a Kafka log. How?Reveal
Two approaches, in order of preference. (1) Crypto-shredding: encrypt each user's events with a per-user encryption key stored in a key management service (KMS). To "delete" a user, delete their key from KMS. The events remain in Kafka and downstream stores but are permanently unreadable. This is clean, auditable, and does not require replaying consumers. (2) Tombstone + log compaction: publish a tombstone event (key = user_id, value = null) to a compacted Kafka topic. Consumers process the tombstone by deleting the user's data from their views. Kafka log compaction eventually removes the original events for that key. This is more complex operationally and requires all consumers to handle tombstones correctly. Crypto-shredding is preferred because it is a single, atomic operation that does not depend on consumer correctness.
Design a sharded counter that supports both increment and decrement (likes and unlikes). What changes?Reveal
The increment path stays the same: hash(user_id) % N selects the sub-key, issue INCR or DECR. The read path still SUMs all sub-keys. The new complexity is idempotency: you must prevent a user from decrementing below zero (unliking something they never liked). The sub-key must store a set of user IDs alongside the counter, or the system must check a separate "has user X liked entity Y?" lookup before allowing the decrement. The simplest approach: each sub-key is a Redis hash with two fields — 'count' (integer) and 'users' (set). INCR adds the user to the set and increments count; DECR checks set membership first. This makes each operation O(1) in Redis but increases memory per sub-key. For very large like counts (millions), the per-sub-key user set becomes large; at that point, move the membership check to a separate Bloom filter or a Cassandra table keyed by (entity_id, user_id).