Integrate Airbyte with Astra DB Serverless

query_builder 15 min

Airbyte builds Extract, Load, Transform (ELT) pipelines from data sources to destinations. In this tutorial, you’ll use an Airbyte connector to extract data from a GNews source and load data into Astra DB Serverless as a destination.

Airbyte is available as a self-hosted or cloud-hosted service. This example uses the cloud-hosted service.

Prerequisites

The code samples on this page assume the following:

Create the Airbyte pipeline

Navigate to your Airbyte workspace at cloud.airbyte.com. The following steps show how to create a Source, Destination, and Connection for your Airbyte pipeline.

Add a GNews source in Airbyte

A Source is an application that extracts data from an underlying data store. This can be an API, a file, a database, or a data warehouse.

  1. Add a source to pull general articles from the GNews API.

  2. Test and verify the source. All connection tests passed appears if the source is valid.

See the Airbyte documentation for more details about how to create a source.

Add an Astra DB destination in Airbyte

A Destination is a target that receives and loads data into an underlying data store. This can be a data warehouse, a data lake, another database, or an analytics tool.

  1. Add a destination to send data to Astra DB.

  2. Provide the following configuration:

    • Destination name: Astra DB

    • Chunk size: 512

    • OpenAI API key: OPENAI_API_KEY

    • Astra DB Application Token: ASTRA_DB_APPLICATION_TOKEN

    • Astra DB Endpoint: ASTRA_DB_API_ENDPOINT

    • Astra DB Keyspace: default_keyspace

    • Astra DB collection: airbyte

  3. Test and verify the destination. All connection tests passed appears if the destination is valid.

See the Airbyte documentation for more details about how to create a destination.

Set up a connection between GNews and Astra DB

A Connection is an Airbyte component that pulls data from a source and pushes data to a destination.

  1. Set up a connection to wire together the GNews source and the Astra DB destination.

  2. Provide the following configuration:

    • Leave the Configuration values as default. Airbyte syncs every 24 hours to the Astra DB destination.

    • Enable the top_headlines and search streams.

    • Select the Incremental | Append + Deduped Sync mode. Each sync appends only modified data to your existing tables and keeps only the most recent data.

  3. Wait for the first sync job to complete. You may need to manually run the sync job if it doesn’t start automatically.

See the Airbyte documentation for more details about how to create a connection.

Verify the integration between Airbyte and Astra DB

Connect to your Astra DB Serverless database to verify that the airbyte collection is receiving the GNews articles.

  1. Create a .env file in the root of your program. Populate the file with the Astra token and endpoint values from the Database Details section of your database’s Overview tab.

    .env
    ASTRA_DB_APPLICATION_TOKEN=TOKEN
    ASTRA_DB_API_ENDPOINT=API_ENDPOINT
    ASTRA_DB_KEYSPACE_NAME=default_keyspace
    ASTRA_DB_COLLECTION_NAME=airbyte
  2. Install dependencies.

    pip install astrapy python-dotenv
  3. Create a Python program to connect to your Astra DB Serverless database and print one article from the airbyte collection.

    airbyte-integration.py
    import os
    from dotenv import load_dotenv
    from astrapy.db import AstraDB
    
    load_dotenv()
    
    db = AstraDB(
        token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
        api_endpoint=os.environ["ASTRA_DB_API_ENDPOINT"],
        namespace=os.environ["ASTRA_DB_KEYSPACE_NAME"],
    )
    
    print(db.collection(os.environ["ASTRA_DB_COLLECTION_NAME"]).find_one())
  4. Run the integration program.

    airbyte-integration.py
    python3 airbyte-integration.py
  5. The output contains source metadata, vector embeddings, and the article text.

    json
    json
    {
      "data": {
        "document": {
          "_id": "5ac002a7-c051-41e0-9628-b35b4549acee",
          "$vector": [
            -0.022076305689849227,
            ...,
            -0.03238973221677352
          ],
          "source": {
            "name": "ABC News",
            "url": "https://www.abc.net.au"
          },
          "_ab_stream": "top_headlines",
          "_ab_record_id": "top_headlines_https://www.abc.net.au/news/2024-02-22/albanese-gifted-dunkley-by-election-birthday-political-fortunes/103494916",
          "text": "content: On a warm late summer's day in Dunkley yesterday, there was no obvious sign of the \"irritable voter syndrome\" that can so often give incumbent parties sleepless nights ahead of a by-election. Voters in this outer-metropolitan pocket of Melbourne might..."
        }
      }
    }
  6. Confirm that Airbyte is streaming data from the search stream.

    airbyte-integration.py
    print(db.collection(os.environ["ASTRA_DB_COLLECTION_NAME"]).find_one({"_ab_stream": "search"}))
    json
    json
    {
      "data": {
        "documents": [
          {
            "_id": "5ac002a7-c051-41e0-9628-b35b4549acee",
            "$vector": [
              -0.022076305689849227,
              ...,
              0.005421517128333851
            ],
            "source": {
              "name": "SooToday",
              "url": "https://www.sootoday.com"
            },
            "_ab_stream": "search",
            "_ab_record_id": "search_https://www.sootoday.com/classifieds/careers/health-care/1862684",
            "text": "content: Description • Develop recreation therapy programs for clients in order to provide activation and respite\n• Provide education for families, staff, and the general public regarding appropriate recreational activities\n• Market program to referring agencies and the community at large\n• Assess the needs of individual clients\n• Plan and implement group and individual recreation therapy sessions\n• Evaluate the effectiveness of recreation therapy programs\n• Collaborate with other healthcare professionals to provide holistic care\n• Maintain documentation and records of client progress\n• Participate in professional development opportunities to stay current with trends in recreation therapy"
          }
        ]
      }
    }
  7. You have confirmed your Airbyte pipeline is streaming data from the GNews API to your Astra DB Serverless database.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com