Skip to main content
Version: 1.0.0 (latest)

๐Ÿงช Schema and data contracts

dlt will evolve the schema at the destination by following the structure and data types of the extracted data. There are several modes that you can use to control this automatic schema evolution, from the default modes where all changes to the schema are accepted to a frozen schema that does not change at all.

Consider this example:

@dlt.resource(schema_contract={"tables": "evolve", "columns": "freeze"})
def items():
...

This resource will allow new tables (both nested tables and tables with dynamic names) to be created, but will throw an exception if data is extracted for an existing table which contains a new column.

Setting up the contractโ€‹

You can control the following schema entities:

  • tables - contract is applied when a new table is created
  • columns - contract is applied when a new column is created on an existing table
  • data_type - contract is applied when data cannot be coerced into a data type associate with existing column.

You can use contract modes to tell dlt how to apply contract for a particular entity:

  • evolve: No constraints on schema changes.
  • freeze: This will raise an exception if data is encountered that does not fit the existing schema, so no data will be loaded to the destination
  • discard_row: This will discard any extracted row if it does not adhere to the existing schema, and this row will not be loaded to the destination.
  • discard_value: This will discard data in an extracted row that does not adhere to the existing schema and the row will be loaded without this data.
note

The default mode (evolve) works as follows:

  1. New tables may be always created
  2. New columns may be always appended to the existing table
  3. Data that do not coerce to existing data type of a particular column will be sent to a variant column created for this particular type.

Passing schema_contract argumentโ€‹

The schema_contract exists on the dlt.source decorator as a default for all resources in that source and on the dlt.resource decorator as a directive for the individual resource - and as a consequence - on all tables created by this resource. Additionally it exists on the pipeline.run() method, which will override all existing settings.

The schema_contract argument accepts two forms:

  1. full: a mapping of schema entities to contract modes
  2. shorthand a contract mode (string) that will be applied to all schema entities.

For example setting schema_contract to freeze will expand to the full form:

{"tables": "freeze", "columns": "freeze", "data_type": "freeze"}

You can change the contract on the source instance via schema_contract property. For resource you can use apply_hints.

Nuances of contract modes.โ€‹

  1. Contracts are applied after names of tables and columns are normalized.
  2. Contract defined on a resource is applied to all root tables and nested tables created by that resource.
  3. discard_row works on table level. So for example if you have two tables in nested relationship ie. users and users__addresses and contract is violated in users__addresses table, the row of that table is discarded while the parent row in users table will be loaded.

Use Pydantic models for data validationโ€‹

Pydantic models can be used to define table schemas and validate incoming data. You can use any model you already have. dlt will internally synthesize (if necessary) new models that conform with the schema contract on the resource.

Just passing a model in column argument of the dlt.resource sets a schema contract that conforms to default Pydantic behavior:

{
"tables": "evolve",
"columns": "discard_value",
"data_type": "freeze"
}

New tables are allowed, extra fields are ignored and invalid data raises an exception.

If you pass schema contract explicitly the following happens to schema entities:

  1. tables do not impact the Pydantic models
  2. columns modes are mapped into the extra modes of Pydantic (see below). dlt will apply this setting recursively if models contain other models.
  3. data_type supports following modes for Pydantic: evolve will synthesize lenient model that allows for any data type. This may result with variant columns upstream. freeze will re-raise ValidationException. discard_row will remove the non-validating data items. discard_value is not currently supported. We may eventually do that on Pydantic v2.

dlt maps column contract modes into the extra fields settings as follows.

Note that this works in two directions. If you use a model with such setting explicitly configured, dlt sets the column contract mode accordingly. This also avoids synthesizing modified models.

column modepydantic extra
evolveallow
freezeforbid
discard_valueignore
discard_rowforbid

discard_row requires additional handling when ValidationError is raised.

tip

Model validation is added as a transform step to the resource. This step will convert the incoming data items into instances of validating models. You could easily convert them back to dictionaries by using add_map(lambda item: item.dict()) on a resource.

note

Pydantic models work on the extracted data before names are normalized or nested tables are created. Make sure to name model fields as in your input data and handle nested data with the nested models.

As a consequence, discard_row will drop the whole data item - even if nested model was affected.

Set contracts on Arrow Tables and Pandasโ€‹

All contract settings apply to arrow tables and panda frames as well.

  1. tables mode the same - no matter what is the data item type
  2. columns will allow new columns, raise an exception or modify tables/frames still in extract step to avoid re-writing parquet files.
  3. data_type changes to data types in tables/frames are not allowed and will result in data type schema clash. We could allow for more modes (evolving data types in Arrow tables sounds weird but ping us on Slack if you need it.)

Here's how dlt deals with column modes:

  1. evolve new columns are allowed (table may be reordered to put them at the end)
  2. discard_value column will be deleted
  3. discard_row rows with the column present will be deleted and then column will be deleted
  4. freeze exception on a new column

Get context from DataValidationError in freeze modeโ€‹

When contract is violated in freeze mode, dlt raises DataValidationError exception. This exception gives access to the full context and passes the evidence to the caller. As with any other exception coming from pipeline run, it will be re-raised via PipelineStepFailed exception which you should catch in except:

try:
pipeline.run()
except PipelineStepFailed as pip_ex:
if pip_ex.step == "normalize":
if isinstance(pip_ex.__context__.__context__, DataValidationError):
...
if pip_ex.step == "extract":
if isinstance(pip_ex.__context__, DataValidationError):
...


DataValidationError provides the following context:

  1. schema_name, table_name and column_name provide the logical "location" at which the contract was violated.
  2. schema_entity and contract_mode tell which contract was violated
  3. table_schema contains the schema against which the contract was validated. May be Pydantic model or dlt TTableSchema instance
  4. schema_contract the full, expanded schema contract
  5. data_item causing data item (Python dict, arrow table, pydantic model or list of there of)

Contracts on new tablesโ€‹

If a table is a new table that has not been created on the destination yet, dlt will allow the creation of new columns. For a single pipeline run, the column mode is changed (internally) to evolve and then reverted back to the original mode. This allows for initial schema inference to happen and then on subsequent run, the inferred contract will be applied to a new data.

Following tables are considered new:

  1. Child tables inferred from the nested data
  2. Dynamic tables created from the data during extraction
  3. Tables containing incomplete columns - columns without data type bound to them.

For example such table is considered new because column number is incomplete (define primary key and NOT null but no data type)

blocks:
description: Ethereum blocks
write_disposition: append
columns:
number:
nullable: false
primary_key: true
name: number

What tables are not considered new:

  1. Those with columns defined by Pydantic modes

Working with datasets that have manually added tables and columns on the first loadโ€‹

In some cases you might be working with datasets that have tables or columns created outside of dlt. If you are loading to a table not created by dlt for the first time, dlt will not know about this table while enforcing schema contracts. This means that if you do a load where the tables are set to evolve, all will work as planned. If you have tables set to freeze, dlt will raise an exception because it thinks you are creating a new table (which you are from dlts perspective). You can allow evolve for one load and then switch back to freeze.

The same thing will happen if dlt knows your table, but you have manually added a column to your destination and you have columns set to freeze.

Code Examplesโ€‹

The below code will silently ignore new subtables, allow new columns to be added to existing tables and raise an error if a variant of a column is discovered.

@dlt.resource(schema_contract={"tables": "discard_row", "columns": "evolve", "data_type": "freeze"})
def items():
...

The below Code will raise on any encountered schema change. Note: You can always set a string which will be interpreted as though all keys are set to these values.

pipeline.run(my_source(), schema_contract="freeze")

The below code defines some settings on the source which can be overwritten on the resource which in turn can be overwritten by the global override on the run method. Here for all resources variant columns are frozen and raise an error if encountered, on items new columns are allowed but other_items inherits the freeze setting from the source, thus new columns are frozen there. New tables are allowed.

@dlt.resource(schema_contract={"columns": "evolve"})
def items():
...

@dlt.resource()
def other_items():
...

@dlt.source(schema_contract={"columns": "freeze", "data_type": "freeze"})
def source():
return [items(), other_items()]


# this will use the settings defined by the decorators
pipeline.run(source())

# this will freeze the whole schema, regardless of the decorator settings
pipeline.run(source(), schema_contract="freeze")

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub โ€“ it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.