Create a Change Data Capture (CDC) connector

CDC connectors are only available for Serverless (Non-Vector) deployments.

You can use the Change Data Capture (CDC) connector to:

  • Process data by client applications.

  • Send data to downstream systems.

  • Capture changes in real time, de-duplicate changes, and stream the clean set of changed data into Astra Streaming.

Astra Streaming processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component is simple, with a 1:1 correspondence between the table and a single Pulsar topic.

This guide provides an end-to-end workflow to create a CDC connector for your Serverless (Non-Vector) deployment and send change data to an Elasticsearch sink.

Enabling CDC for Serverless (Non-Vector) databases increases costs based on your Astra Streaming usage. See Astra Streaming pricing and CDC metering rates.

Supported data structures

The following data types and corresponding AVRO or logical types are supported for CDC for Serverless (Non-Vector) databases:

Data type

AVRO type

ascii

string

bigint

long

blob

bytes

boolean

boolean

counter

long

date

int

decimal

cql_decimal

double

double

duration

cql_duration

float

float

inet

string

int

int

list

array

map

map (only string-type keys are supported)

set

array

smallint

int

text

string

time

long

timestamp

long

timeuuid

string

tinyint

int

uuid

string

varchar

string

varint

cql_varint / bytes

Cassandra static columns are supported:

  • On row-level updates, static columns are included in the message value.

  • On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on INSERT and UPDATE operations.

For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.

AVRO interpretation

Serverless (Non-Vector) database keys are strings, while CDC produces AVRO messages which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.

The table below describes the conversion of AVRO logical types. The record type is a schema containing the listed fields.

AVRO complex types
Name AVRO type Fields Explanation

collections

array

lists, sets

Sets and Lists are treated as AVRO type array, with the attribute items containing the schema of the array’s items.

decimal

record

BIG_INT, DECIMAL_SCALE

The Cassandra DECIMAL type is converted to a record with the cql_decimal logical type.

duration

record

CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS

The Cassandra DURATION type is converted to a record with the cql_duration logical type.

maps

map

KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA

The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON.

Limitations

CDC for Serverless (Non-Vector) databases has the following limitations:

  • Does not manage table truncates.

  • Does not sync data available before starting the CDC agent.

  • Does not replay logged batches.

  • Does not manage time-to-live.

  • Does not support range deletes.

  • CQL column names must not match a Pulsar primitive type name (ex: INT32).

  • Does not support multi-region.

Prerequisites

You need the following items to complete this procedure:

Create a streaming tenant

  1. Log into the Astra Portal. At the bottom of the Welcome page, select View Streaming.

  2. Select Create Tenant.

  3. Enter a name for your new streaming tenant.

  4. Select a provider and region.

  5. Select Create Tenant.

    Astra Streaming CDC can only be used in a region that supports both Astra Streaming and Serverless (Non-Vector) databases. See Regions for more information.

Create a table

  1. Select Databases from the main navigation.

  2. Select the name of the active database that you would like to use.

  3. Select the CQL Console tab.

  4. Create a table with a primary key column using the following command. Edit the command to add your KEYSPACE_NAME and choose a TABLE_NAME.

    CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);
  5. Confirm that your table was created:

    select * from KEYSPACE_NAME.TABLE_NAME;
    Result:
    token@cqlsh> select * from KEYSPACE_NAME.TABLE_NAME;
    
     key | c1
    -----+----
    
    (0 rows)
    token@cqlsh>

You have now created a table and confirmed that the table exists in your Serverless (Non-Vector) database.

Connect to CDC for Serverless (Non-Vector) databases

Complete the following steps after you have created a tenant and a table.

  1. Select Databases from the main navigation.

  2. Select the name of the active database that you would like to use.

  3. In your database dashboard, select the CDC tab.

  4. Select Enable CDC.

  5. Complete the fields to select a tenant, select a keyspace, and enter the name of the table you made.

  6. Select Enable CDC. Once created, your CDC connector appears under the Change Data Capture (CDC) tab in your database dashboard.

Enabling CDC creates a new astracdc namespace with two new topics, data- and log-. The log- topic consumes schema changes, processes them, and then writes clean data to the data- topic. The log- topic is for CDC functionality and should not be used. The data- topic is used to consume CDC data in Astra Streaming.

Connect Elasticsearch sink

Connect an Elasticsearch sink to CDC that consumes messages from the data- topic and sends them to your Elasticsearch deployment.

  1. In your active database dashboard, select the CDC tab.

  2. Under Change Data Capture, select the name of the CDC-enabled table you would like to use. You should still be in the CDC tab after selecting a name, but the header becomes CDC for TABLE_NAME with a green Active icon next to it.

  3. Select Add Elastic Search Sink to select your settings.

  4. Select the astracdc namespace.

  5. Select Elastic Search for the sink type.

  6. Enter a name for your sink.

  7. Under Connect Topics, select a data- topic in the astracdc namespace for the input topic.

  8. Complete Sink-Specific Configuration with the Elasticsearch URL, Index name, and API key found in your Elasticsearch deployment portal. Leave username, password, and token blank.

    Default values auto-populate. These values are recommended:

    • Ignore Record Key as false

    • Null Value Action as DELETE

    • Enable Schema as true

  9. When the fields are completed, select Create.

If creation is successful, SINK_NAME created successfully appears at the top of the screen. You can confirm that your new sink was created in the Sinks tab.

Send messages

Let’s process some changes with CDC.

  1. In your active database dashboard, select the CQL Console tab.

  2. Modify the table you created.

    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123');
    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');
  3. Confirm the changes you’ve made:

    select * from KEYSPACE_NAME.TABLE_NAME;
    Result:
     key | c1
    -----+----------
     32a |  bob3123
     32b | bob3123b
    
    (2 rows)
    token@cqlsh>

Your processed changes in the resulting table verify that the messages sent successfully.

Confirm Elasticsearch receives change data

Ensure that your new Elasticsearch sink receives data once it is connected.

  1. Issue a GET request to your Elasticsearch deployment to confirm Elasticsearch is receiving changes from your database via CDC.

    curl -X POST "ELASTIC_URL/INDEX_NAME/_search?pretty"
      -H "Authorization: ApiKey 'API_KEY'"
  2. A JSON response with your changes to the index is returned, confirming that Astra Streaming is sending your CDC changes to your Elasticsearch sink.

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 3,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "INDEX_NAME",
            "_id" : "khl_hI0Bh25AUvCHghQo",
            "_score" : 1.0,
            "_source" : {
              "name" : "foo",
              "title" : "bar"
            }
          },
          {
            "_index" : "INDEX_NAME",
            "_id" : "32a",
            "_score" : 1.0,
            "_source" : {
              "c1" : "bob3123"
            }
          },
          {
            "_index" : "INDEX_NAME",
            "_id" : "32b",
            "_score" : 1.0,
            "_source" : {
              "c1" : "bob3123b"
            }
          }
        ]
      }
    }

Outcomes

At this point you have successfully:

  • Created a tenant, topic, and table.

  • Connected your Serverless (Non-Vector) database to CDC.

  • Connected Elasicsearch sink to your CDC and verified that messages are sent and received successfully.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com