Create a Change Data Capture (CDC) connector
CDC connectors are only available for Serverless (Non-Vector) deployments. |
You can use the Change Data Capture (CDC) connector to:
-
Process data by client applications.
-
Send data to downstream systems.
-
Capture changes in real time, de-duplicate changes, and stream the clean set of changed data into Astra Streaming.
Astra Streaming processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component is simple, with a 1:1 correspondence between the table and a single Pulsar topic.
This guide provides an end-to-end workflow to create a CDC connector for your Serverless (Non-Vector) deployment and send change data to an Elasticsearch sink.
Enabling CDC for Serverless (Non-Vector) databases increases costs based on your Astra Streaming usage. See Astra Streaming pricing and CDC metering rates. |
Supported data structures
The following data types and corresponding AVRO or logical types are supported for CDC for Serverless (Non-Vector) databases:
Data type |
AVRO type |
ascii |
string |
bigint |
long |
blob |
bytes |
boolean |
boolean |
counter |
long |
date |
int |
decimal |
cql_decimal |
double |
double |
duration |
cql_duration |
float |
float |
inet |
string |
int |
int |
list |
array |
map |
map (only string-type keys are supported) |
set |
array |
smallint |
int |
text |
string |
time |
long |
timestamp |
long |
timeuuid |
string |
tinyint |
int |
uuid |
string |
varchar |
string |
varint |
cql_varint / bytes |
Cassandra static columns are supported:
-
On row-level updates, static columns are included in the message value.
-
On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on
INSERT
andUPDATE
operations.
For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.
AVRO interpretation
Serverless (Non-Vector) database keys are strings, while CDC produces AVRO messages which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.
The table below describes the conversion of AVRO logical types. The record
type is a schema containing the listed fields.
Name | AVRO type | Fields | Explanation |
---|---|---|---|
collections |
array |
lists, sets |
Sets and Lists are treated as AVRO type |
decimal |
record |
BIG_INT, DECIMAL_SCALE |
The Cassandra DECIMAL type is converted to a |
duration |
record |
CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS |
The Cassandra DURATION type is converted to a |
maps |
map |
KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA |
The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON. |
Limitations
CDC for Serverless (Non-Vector) databases has the following limitations:
-
Does not manage table truncates.
-
Does not sync data available before starting the CDC agent.
-
Does not replay logged batches.
-
Does not manage time-to-live.
-
Does not support range deletes.
-
CQL column names must not match a Pulsar primitive type name (ex: INT32).
-
Does not support multi-region.
Prerequisites
You need the following items to complete this procedure:
-
An active Astra account.
-
A Serverless (Non-Vector) database created in the Astra Portal.
-
A secure connect bundle downloaded from the Astra Portal.
-
A keyspace created in the Astra Portal.
-
An active Elasticsearch account.
-
An Elasticsearch endpoint, index name, and API key retrieved from your Elasticsearch Deployment.
Create a streaming tenant
-
Log into the Astra Portal. At the bottom of the Welcome page, select View Streaming.
-
Select Create Tenant.
-
Enter a name for your new streaming tenant.
-
Select a provider and region.
-
Select Create Tenant.
Astra Streaming CDC can only be used in a region that supports both Astra Streaming and Serverless (Non-Vector) databases. See Regions for more information.
Create a table
-
Select Databases from the main navigation.
-
Select the name of the active database that you would like to use.
-
Select the CQL Console tab.
-
Create a table with a primary key column using the following command. Edit the command to add your
KEYSPACE_NAME
and choose aTABLE_NAME
.CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);
-
Confirm that your table was created:
select * from KEYSPACE_NAME.TABLE_NAME;
Result:
token@cqlsh> select * from KEYSPACE_NAME.TABLE_NAME; key | c1 -----+---- (0 rows) token@cqlsh>
You have now created a table and confirmed that the table exists in your Serverless (Non-Vector) database.
Connect to CDC for Serverless (Non-Vector) databases
-
Select Databases from the main navigation.
-
Select the name of the active database that you would like to use.
-
In your database dashboard, select the CDC tab.
-
Select Enable CDC.
-
Complete the fields to select a tenant, select a keyspace, and select the name of the table you created.
-
Select Enable CDC. Once created, your CDC connector appears under the Change Data Capture (CDC) tab in your database dashboard.
Enabling CDC creates a new astracdc
namespace with two new topics, data-
and log-
.
The log-
topic consumes schema changes, processes them, and then writes clean data to the data-
topic.
The log-
topic is for CDC functionality and should not be used.
The data-
topic is used to consume CDC data in Astra Streaming.
Connect Elasticsearch sink
Connect an Elasticsearch sink to CDC that consumes messages from the data-
topic and sends them to your Elasticsearch deployment.
-
In your active database dashboard, select the CDC tab.
-
Under Change Data Capture, select the name of the CDC-enabled table you would like to use. You should still be in the CDC tab after selecting a name, but the header becomes CDC for
TABLE_NAME
with a green Active icon next to it. -
Select Add Elastic Search Sink to select your settings.
-
Select the
astracdc
namespace. -
Select Elastic Search for the sink type.
-
Enter a name for your sink.
-
Under Connect Topics, select a
data-
topic in theastracdc
namespace for the input topic. -
Complete Sink-Specific Configuration with the Elasticsearch URL, Index name, and API key found in your Elasticsearch deployment portal. Leave username, password, and token blank.
Default values auto-populate. These values are recommended:
-
Ignore Record Key
asfalse
-
Null Value Action
asDELETE
-
Enable Schema
astrue
-
-
When the fields are completed, select Create.
If creation is successful, SINK_NAME created successfully
appears at the top of the screen.
You can confirm that your new sink was created in the Sinks tab.
Send messages
Let’s process some changes with CDC.
-
In your active database dashboard, select the CQL Console tab.
-
Modify the table you created.
INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123'); INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');
-
Confirm the changes you’ve made:
select * from KEYSPACE_NAME.TABLE_NAME;
Result:
key | c1 -----+---------- 32a | bob3123 32b | bob3123b (2 rows) token@cqlsh>
Your processed changes in the resulting table verify that the messages sent successfully.
Confirm Elasticsearch receives change data
Ensure that your new Elasticsearch sink receives data once it is connected.
-
Issue a GET request to your Elasticsearch deployment to confirm Elasticsearch is receiving changes from your database via CDC.
curl -X POST "ELASTIC_URL/INDEX_NAME/_search?pretty" -H "Authorization: ApiKey 'API_KEY'"
-
A JSON response with your changes to the index is returned, confirming that Astra Streaming is sending your CDC changes to your Elasticsearch sink.
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 3, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "INDEX_NAME", "_id" : "khl_hI0Bh25AUvCHghQo", "_score" : 1.0, "_source" : { "name" : "foo", "title" : "bar" } }, { "_index" : "INDEX_NAME", "_id" : "32a", "_score" : 1.0, "_source" : { "c1" : "bob3123" } }, { "_index" : "INDEX_NAME", "_id" : "32b", "_score" : 1.0, "_source" : { "c1" : "bob3123b" } } ] } }
Outcomes
At this point you have successfully:
-
Created a tenant, topic, and table.
-
Connected your Serverless (Non-Vector) database to CDC.
-
Connected Elasicsearch sink to your CDC and verified that messages are sent and received successfully.
Resources
For more on Astra Streaming, see: