• Airbyte is a foss core tool for managing ETL operations between data sources.
  • Airbyte uses dbt under the covers to do some of the heavy lifting during transformation.

Airbyte API

A REST API is provided by the cloud and self-hosted platform and documentation is available here. The API routes are found at /api/public/v1/ on the same port as the UI.

There is also a python sdk.

To use the Python SDK we need to override the base url for the server like so:

 
import os
from dotenv import load_dotenv
 
import airbyte_api
from airbyte_api import models, api
 
load_dotenv()
 
s = airbyte_api.AirbyteAPI(
    security=models.Security(
        basic_auth=models.SchemeBasicAuth(
            password=os.getenv('AIRBYE_PASSWORD'),
            username=os.getenv('AIRBYTE_USER'),
        ),
    ),
    # AIRBYTE_URL=http://<HOSTNAME>:<PORT>/api/public/v1/
    server_url=os.getenv("AIRBYTE_URL"),
)
 
# do something with the api like list existing connections
 
resp = s.connections.list_connections(api.ListConnectionsRequest())
 
for connection in resp.connections_response.data:
    print(f"{connection.connection_id} - {connection.name}")

Selected Fields

As of V0.63.4 you can now get a list of fields available within a given stream (resolving issue 35798in their git project):

conn = s.connections.get_connection(api.GetConnectionRequest(connection_id=config['connection_id']))
 
    
resp = conn.raw_response.json()
 
for stream in resp['configurations']['streams']:
    print(stream)
 
 
 
s_id = conn.connection_response.source_id
 
sresp = s.streams.get_stream_properties(api.GetStreamPropertiesRequest(source_id=s_id))
 
for stream in sresp.stream_properties_response:
 
  print(stream.stream_name)
  print(stream)
  for field in stream.property_fields:
    print(field[0])
    
  print("---")

We can then set which fields are enabled for a given connection:

stream_configs = [
    models.StreamConfiguration("articles", selected_fields=[
        models.SelectedFieldInfo(field_path=['id']),
        models.SelectedFieldInfo(field_path=['title']),
        models.SelectedFieldInfo(field_path=['type']),
        ])
]
 
req_body = api.PatchConnectionRequest(api.models_connectionpatchrequest.ConnectionPatchRequest(configurations=models.StreamConfigurations(streams=stream_configs)), connection_id=config['connection_id'])
 
resp = s.connections.patch_connection(req_body)
 
print(resp.connection_response)

Octavia

Octavia has been deprecated and is no longer supported

Airbyte Transformation

Authentication

Record Processing

  • Airbyte connectors make HTTP requests, carry out transformations of the data
  • The documentation seems to imply that records can be arbitrary json objects - you just specify a JSON selector to point to where those records are.
  • We use json-schema to define the record object schema

Upgrading Airbyte

If the deployment is done via docker-compose then we can use the provided script to upgrade the deployment as per this guide:

# upgrade management script
wget https://raw.githubusercontent.com/airbytehq/airbyte/master/run-ab-platform.sh
# remove old docker env artifacts
./run-ab-platform.sh -r
# bring the system back online
./run-ab-platform.sh -b

Airbyte Docker Troubleshooting

  • Make sure we are running the very latest versions of docker and docker-compose or else we get some strange error messages using the provided docker-compose.yaml. A comprehensive walkthrough can be found here

Permissions Error with temp files/workspaces

If you encounter issues like this one:

ERROR StatusConsoleListener FileManager (/tmp/workspace/4de9764c-5e55-4c00-9cf6-f23c54d6db2d/0/logs.log) java.io.IOException: Could not create directory /tmp/workspace/4de9764c-5e55-4c00-9cf6-f23c54d6db2d/0

Then you may need to set permissions properly in the airbyte_workspace volume using a command like this:

docker exec -u root -it airbyte-worker /bin/bash
chown -R airbyte:airbyte /tmp

Cannot Find image: airbyte/source-mysql:3.6.0

No idea why this is happening. If you run the upgrade script as detailedhere it seems to fix it. Not exactly confidence boosting.

RESOURCE_EXHAUSTED: grpc: received message larger than max

According to this bug the state of the MySQL sync history can grow too big and cause Airbyte to fail every time. The only workaround is to recreate the connection.