This document contains an example system design that uses change-data-capture (CDC) to enable distribution and replication of structured data across system boundaries. Implementations of this design can support advanced event driven and event sourcing use cases such as database migrations, client side caching, real-time ETL, and maintaining read-replicas. If you are introducing CDC into a system for the first time or looking for ideas related to improving your current event sourcing or data replication practices then this example design should provide you with a decent foundation that can then be adapted to your specific needs.

I assume prior knowledge of CDC in general. If you are not familiar with change-data-capture then see my introductory article on the subject[1].

Data Mobility

Data mobility describes the ease or cost of moving data between systems. A high data mobility means it is inexpensive and commonplace to either replicate or transfer data between systems. A low data mobility, then, means it is prohibitively expensive to do the same. In my experience, most organizations and projects have low data mobility. Often, the cost of moving data is so high that the idea of using data replication to solve a problem is discarded as a non-starter if it is considered at all.

Low data mobility acts as an architectural anchor. Extracting smaller, independent systems from a monolith, consolidating sprawling microservices, moving between database technologies to support unexpected read or write scale, converting batch processing to online processing, creating client-side caches of data for performance improvements, and building content aggregation features such as unified search are all operations that are difficult to support with low data mobility. Often, in my experience, these types of operations can even be pragmatically impossible due to cost and complexity in projects or organizations with low data mobility.

The system design included in this document can be implemented for nearly any existing system in order to raise its data mobility.

Overview And Prerequisites

This system design implements what I call a full state transfer model[2]. This means that the end product of the system is an ordered stream of structured events that each represent the full and complete state of some logical resource. Each change to a resource results in a new state being published as an event.

A major prerequisite of this design is an ordered, partitioned stream. The major contenders for this at the time of writing are Kafka[3], Pulsar[4], AWS Kinesis[5], Azure EventHubs[6], and GCP Pub/Sub[7]. Your specific choice of stream technology is not particularly important to the overall design so long as it supports the following features:

  • Partitioned or sharded streams (usually referred to as topics and partitions)

  • In-order delivery guarantees

  • At-least once delivery guarantees

  • Multiple consumers may receive the same events

The other major prerequisite is access to a CDC engine[8], or some tool that can extract changes from your database and publish them as events on the stream. The most prominent open source option for this, at the time of writing, is Debezium[9]. Additionally, cloud providers may offer managed engines such as AWS DMS[10] and GCP Datastream[11].

The framework is then designed around these two tools to provide an event processing pipeline that results in a stream of state updates that can be used to synchronize and replicate data across systems:

A pipeline of database changes, to CDC engine, to a raw event stream, to an event normalizer component, to a processed event stream, to a resource state producer, to a stream of full state updates, to consumers.

I will cover each component of the design as though it is a discrete and separate system from the others but there is no requirement for it to be implemented that way. All of the components of the design can be built as libraries, microservices, a centralized and monolithic system, or anywhere in between.

State Representation Data Model

The first step is to identify the resource you want to replicate and formalize its structure. The idea here is to create a contract around a shared schema that represents the resource or object. Most systems likely already have some canonical schema for a resource but it may be implicit. For example, if a system presents an API call to /v1/api/widgets/42 and always responds with a payload like:

{
  "id": 42,
  "name": "Example Widget",
  "description": "This is a sample widget for demonstration purposes.",
  "quantity": 100,
  "dimensions": {
    "x": 10,
    "y": 5,
    "z": 3
  },
  "colors": ["red", "blue", "green"]
}

then this structure is likely the canonical form of a "widget" resource. An optional, but highly recommended, step is to formalize this structure using some kind of schema language. Common options for schema languages, at the time of writing, are OpenAPI[12], JSON Schema[13], and Protobuf[14]. Each language comes with its own ecosystem of tools and your choice of language doesn’t materially affect this framework design. The key concept behind formalizing a resource schema contract is that it creates a single, shared, and distributable document that represents the resource at all points of presence.

Resource Table Mapping

Once you’ve identified a resource to replicate then the next step is mapping that resource back to the underlying database schema. The goal of this process is to identify all tables or collections that contain data used to populate the resource. To illustrate, I’ll continue with the widget example of

{
  "id": 42,
  "name": "Example Widget",
  "description": "This is a sample widget for demonstration purposes.",
  "quantity": 100,
  "dimensions": {
    "x": 10,
    "y": 5,
    "z": 3
  },
  "colors": ["red", "blue", "green"]
}

If you use a document store then this structure may be stored as-is and within a single collection which makes this step easy. SQL databases, on the other hand, are usually structured in a more normalized way. For example, the following would likely be a common way to structure a widget in a SQL database:

A collection of SQL tables that represent a normalized widget.

In this arrangement, the scalar values of name, description, and quantity are contained in the main widgets table. The "dimensions" and "colors" fields are extracted to separate tables that join back to the widget table by foreign key because they are more complex types such as maps and lists.

Keep a record of which tables or collections contain the resource data. This list will be used later to filter CDC events for only those relevant to the resource.

Raw CDC Data Stream

A CDC engine extracts events from the source database and writes them to an event stream topic labeled as raw.

Once you’ve identified the resource to replicate and the relevant database contents then the next step is to set up your CDC engine. The exact setup steps will be different for each engine and you will need to reference the engine’s documentation for details.

One of the things to look for in your engine is how it provides extraction of existing data from a database. This process might be referred to as a bootstrap, full load, or initial load in your engine’s documentation. Engines commonly offer data extraction in three forms: bootstrap, bootstrap and CDC, and CDC only. A CDC only extraction model means that only changes made to the database after the engine is connected are extracted. A bootstrap only extraction model effectively generates a snapshot of the current state of the database. The bootstrap and CDC model combines the two approaches in a way that guarantees both the initial snapshot and delivery of all changes that occurred during the snapshot process. Bootstrap and CDC is the more ideal configuration for starting CDC and recovering from engine related failures.

A second aspect of your engine to consider is whether it can recover from failure without needing to bootstrap again. A common feature of CDC engines is that they keep a record of their position within the database’s change stream, such as a binlog or transaction log, so they can resume from that point. Engines that resume rely on their position being persistent which sometimes requires configuration of the database to preserve logs for longer than the default period of time. If an engine cannot resume or if the engine’s resume position is no longer available in the database then a bootstrap operation is required to recover from failures.

Most of the work for this step in the design is learning the details of your chosen CDC engine. The primary output is a topic on your ordered stream that contains CDC engine generated change events. Additionally, I recommend building automation around engine management and recovery that includes, at least, the following aspects:

  • Detection of a failed or crashed engine

  • On failure, a configurable number of attempts to resume with backoff between attempts

  • On failure to resume, a hard restart using bootstrap and CDC

  • Alerts if both resume and restart fail

CDC Event Normalizer

A stream processor converts raw engine events into a standardized structure.

The event normalizer consumes one or more streams of "raw" CDC engine output and converts each event into a normal, or standardized, form. Most CDC engines emit events as structured JSON objects but each engine’s output has a different overall schema and events from the same engine may differ in schema based on the source database being monitored. The purpose of the normalizer is to convert different engine outputs to a common structure so that systems built past this point can operate on any CDC event regardless of its origin.

To illustrate the problem that the normalizer solves, let’s look at some example events representing the update of a widget’s quantity value.

Debezium - MySQL

{
  "schema": {}, // structured description of the payload
  "payload": {
    "before": {
      "id": 42,
      "name": "Example Widget",
      "description": "This is a sample widget for demonstration purposes.",
      "quantity": 100
    },
    "after": {
      "id": 42,
      "name": "Example Widget",
      "description": "This is a sample widget for demonstration purposes.",
      "quantity": 99
    },
    "source": {
      "connector": "mysql",
      // ...
      "db": "widgets",
      "table": "widgets",
      // ...
    },
    "op": "u",
    "ts_ms": 1465581029523,
    // ...
  }
}

Debezium - MongoDB

{
  "schema": {}, // structured description of the payload
  "payload": {
    "op": "u",
    "ts_ms": 1465491461815,
    "before":"{\"id\":42,\"name\":\"Example Widget\",\"description\":\"This is a sample widget for demonstration purposes.\",\"quantity\":100,\"dimensions\":{\"x\":10,\"y\":5,\"z\":3},\"colors\":[\"red\",\"blue\",\"green\"]}",
    "after":"{\"id\":42,\"name\":\"Example Widget\",\"description\":\"This is a sample widget for demonstration purposes.\",\"quantity\":99,\"dimensions\":{\"x\":10,\"y\":5,\"z\":3},\"colors\":[\"red\",\"blue\",\"green\"]}",
    "updateDescription": {
      "removedFields": null,
      "updatedFields": "{\"quantity\": 99}",
      "truncatedArrays": null
    },
    "source": {
      "connector": "mongodb",
      "ts_ms": 1558965508000,
      // ...
      "db": "widgets",
      "collection": "widgets",
      // ...
    }
  }
}

DynamoDB Streams

{
  "eventID": "f07f8ca4b0b26cb9c4e5e77e69f274ee",
  "eventName": "UPDATE",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-1",
  "userIdentity":{
    "type":"Service",
    "principalId":"dynamodb.amazonaws.com"
  },
  "dynamodb": {
    "ApproximateCreationDateTime": 1480642020,
    "Keys": {
      "val": {
        "S": "data"
      },
      "key": {
        "S": "binary"
      }
    },
    "OldImage": {
        // previous record stored, same format as NewImage
    },
    "NewImage": {
      "name": {
        "S": "Example Widget"
      },
      "description": {
        "S": "This is a sample widget for demonstration purposes."
      },
      "quantity": {
        "N": 99
      },
      "dimensions": {
        "M": {
            "x": {"N": 10},
            "y": {"N": 5},
            "z": {"N": 3}
        }
      },
      "colors": {
        "SS": ["red", "blue", "green"]
      },
      "key": {
        "S": "42"
      }
    },
    "SequenceNumber": "1405400000000002063282832",
    "SizeBytes": 54,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/Widgets/stream/2024-01-01T00:00:00.000"
}

The three examples above demonstrate the variety of structures that engine events can present. Despite the variations in structure, though, each event contains roughly the same kind and amount of information. Engine event contents can be categorized as:

  • Type of change (CREATE/UPDATE/DELETE)

  • Time of change

  • Content before change (optional)

  • Content after change

  • Source database metadata

  • Engine specific metadata

From this list, everything except for engine specific metadata can be reformatted into a standardized structure. Reformatting can nearly always be done without special knowledge of the table or collection schema and only with a knowledge of the engine and database type. The event normalizer, then, is a collection of these static transformation rules that convert the raw CDC engine events into a standard form. A standard form, for example, might look like this:

{
  "timestamp": "<ISO 8601 String>",
  "change_type": "<CREATE | UPDATE | DELETE>",
  "before": {}, // optional before image
  "after": { // simplified JSON representation of the record
    "id":42,
    "name":"Example Widget",
    "description":"This is a sample widget for demonstration purposes.",
    "quantity":100
    // ... other fields if present
  },
  "source": {
    "kind": "<MongoDB | MySQL | DynamoDB | etc.>",
    "container": "<database name | AWS account | etc.>",
    "table": "<table name | collection name | etc.>"
  },
  "engine": {} // optional, arbitrary engine information
}

The output of the normalizer is a stream of these standardized events that all subsequent components build on. Alternatively, a normalizer library can be used to convert the raw engine events into standard events on-demand. In either case, having a documentable standard form for CDC events provides a stable foundation for building other standardized tooling and support libraries that improve the developer experience of onboarding to CDC and reduce the overall adoption costs.

State Producer

A stream processor converts normalized engine events into a full state representation of the change, optionally leveraging a connection to the source database.

The next step of the pipeline is to create a component that will consume normalized CDC events and generate a stream of full state transfer objects. This step will use both the State Representation Data Model and the Resource Table Mapping from earlier in this process. I outline this process in another article that surveys the different CDC usage patterns[15] but I’ll detail the practice in more depth here.

A state producer may be responsible for any number of resources but I’ll continue the description as though each producer handles only one model. A state producer consumes from the stream of normalized CDC events, selects events that match a table or collection from the model’s mapping, optionally queries the source database in order to produce the current model, and then writes the resulting model to a new stream. Notably, state producers for a model must be aware of the underlying database schema because they will need to interact with the before and after images of the CDC events. Let’s walk through the process using the running widget example and assume a MySQL source database.

The SQL schema for a widget is normalized such that there are three tables: widget.widget, widget.widget_colors, and widget.widget_dimensions. These three tables, together, contain the information required to populate the canonical widget data model which is:

{
  "id": 42,
  "name": "Example Widget",
  "description": "This is a sample widget for demonstration purposes.",
  "quantity": 100,
  "dimensions": {
    "x": 10,
    "y": 5,
    "z": 3
  },
  "colors": ["red", "blue", "green"]
}

A widget state producer consumes from the normalized CDC stream and inspects the source database metadata to find events relevant to widget changes. For example, the following event would identify that a dimension value has changed:

{
  "timestamp": "<ISO 8601 String>",
  "change_type": "UPDATE",
  "before": {
    "widget_id":42,
    "x": 10,
    "y": 5,
    "z": 3
  },
  "after": {
    "widget_id":42,
    "x": 10,
    "y": 10,
    "z": 10
  },
  "source": {
    "kind": "MySQL",
    "container": "widget",
    "table": "widget_dimensions"
  }
}

This event does not contain enough information to produce a full widget model so the producer must extract the relevant widget identifier and query the source database in order to fetch the complete set of information. Document databases and fully denormalized SQL tables that contain the entirety of a model’s information can skip the database lookup. In either case, the updated model would be:

{
  "id": 42,
  "name": "Example Widget",
  "description": "This is a sample widget for demonstration purposes.",
  "quantity": 100,
  "dimensions": {
    "x": 10,
    "y": 10,
    "z": 10
  },
  "colors": ["red", "blue", "green"]
}

This object represents the full and complete state of the resource at the current point in time. This state is then written to the output stream, ideally with some standard envelope that contains relevant change metadata. For example, the resulting event might be:

{
  "type": "widget",
  "operation": "update",
  "id": 42,
  "resource": {
    "id": 42,
    "name": "Example Widget",
    "description": "This is a sample widget for demonstration purposes.",
    "quantity": 100,
    "dimensions": {
      "x": 10,
      "y": 10,
      "z": 10
    },
    "colors": ["red", "blue", "green"]
  }
}

These state update events are written to an ordered stream where consumers can then use them for a variety of purposes. Generally, this is the final step that directly interacts with CDC events and all subsequent components are consumers of only these state updates.

Bootstrap Provider

The bootstrap provider consumes from the stream of widget updates and other systems consult it for bootstrap data.

Bootstrapping is the process of initializing a consumer with existing data as opposed to consuming updates. Consumers must be bootstrapped either when they are first onboarding to the framework or when they are recovering from a major fault involving data loss.

The streams that result from state producers contain full state representations and are useful for keeping consumers in sync with the current state of a resource. However, the state update streams are often ineffective or inefficient tools for initializing a new consumer with the current state of all resources. For example, most streams can’t practically support infinite retention of events and have a time based expiration for content which means that a consumer could read the entire stream without receiving the state for every existing resource. Alternatively, infinite stream retention would result in progressively longer times to read the entire stream from the beginning with that read time also approaching infinity as the stream grows.

The CDC engines have a concept of bootstrapping where they read all existing records and produce them as new inserts. This is the tool used both at the start of an engine’s usage and to recover from major engine failures. However, the engine bootstrap process operates at the beginning of the processing pipeline and results in the raw CDC event stream receiving a new copy of the entire source database. This, then, causes any state producers to reproduce the full state of every resource. That reproduction of content then has a multiplicative effect on existing state update consumers by pushing mostly redundant content through every system that was previously caught up. As a result, using a CDC engine’s bootstrap process to onboard a new state update consumer is needlessly expensive in terms of system impact and resource consumption.

To account for these challenges and complications, I recommend building a dedicated bootstrap provider component. This component consumes from the state update stream both during the initial engine bootstrap and ongoing for all subsequent updates. It then populates some kind of storage that contains the most current model for any given resource. Consumers, then, consult the bootstrap provider to get initial content and use the state update streams only for updates. This isolates the impact of bootstrapping a consumer from the rest of the system.

A bootstrap provider can come in a variety of forms. For example, the provider could be a stream processor that populates an S3 bucket and consumers then read from the bucket to bootstrap. Alternatively, the provider could be an API that encapsulates it’s own database that contains the same content. The important qualities of the provider are that it:

  • Keeps a record of its position within any state update stream

  • Operates as close to the head of every stream as possible

  • Removes deleted resources from storage

A consumer would then bootstrap using logic like the following:

The consumer first fetches the bootstrap provider’s current position, starts at or scans to that position in the stream but does not start reading, reads all relevant bootstrap records from the provider, and finally starts reading the stream and applying updates.

Using the provider’s stream position as the starting point for reading updates after reading the provider’s full set of objects has three main benefits. The first is that it accounts for the fact that updates may be written during the bootstrap process. The second is that it allows for a successful bootstrap even if the provider, itself, is lagging behind in the state update stream. The third is that it minimizes the number of redundant updates that must be read and applied from the stream after the bootstrap is complete.

Systems onboarding to CDC for the first time are particularly prone to errors and will likely need to bootstrap multiple times before getting into a steady state. Having a bootstrap provider makes this process less resource intensive and less impactful to other systems.

State Update Consumers

Consumers receive state update events from the stream and consult the bootstrap provider as needed.

The final component of the framework is a consumer of the state update events. These consumers implement the final expressions of data replication such as generating local read-replicas/caches and triggering real-time side-effects on resource changes, etc. There is no specific design for what a consumer should do with state updates or how it should implement that behavior other than how it operates on the stream. The high level requirements for any consumer are that it:

  • Never skips an event

  • Processes all events in order

  • Retries all failed event processing until successful

  • Bootstraps on any critical failure that results in an unprocessed event

The inherent asynchronicity in the framework results in consumers being eventually consistent with the source databases from which the state updates are derived. Implementing the above behaviors ensures that any local copy of the data correctly approaches the source even under failure conditions but at the cost of added latency.

Framework Conclusion

A pipeline of database changes, to CDC engine, to a raw event stream, to an event normalizer component, to a processed event stream, to a resource state producer, to a stream of full state updates, to consumers.

The framework I’ve proposed predominantly relies on an external CDC engine and event stream to actually drive the system. The custom development effort is focused on data modelling, constructing resource models from CDC events, and building a data repository than can be used to bootstrap new consumers. The use cases for the streams of full state updates are left open ended and based on your specific needs.

My experience building frameworks like this is that the larger challenges are operational rather than being related to coding for complex situations. In particular, the success of the framework hinges on strong operational and reliability practices around engine management, stream management, and stream consumption. I strongly recommend that you invest in monitoring and observability tools that help you verify that the various constraints and requirements I’ve mentioned for each component are satisfied or that you are quickly notified when they are not.

Especially if you are introducing event streaming for the first time into a system, I suggest you always know key metrics like landing rate, distance from stream head, and processing time for each consumer. These metrics are critical for understanding the degree of eventual consistency, general health, and predictive performance of the framework overall. Tracking failures to handle events and being able to detect non-retryable failures are also critical features for knowing when, for example, a consumer needs to bootstrap in order to repair itself.

If you adequately manage the operational concerns, though, a framework like this can be transformative.