Testing Behavior#

Testing Configuration#

By default, Cosmos will add a test after each model. This can be overridden using the test_behavior field in the RenderConfig object. Note that this behavior is different from dbt’s default behavior, which runs all tests after all models have been run. Cosmos defaults to running tests after each model to take a “fail-fast” approach to testing. This means that if a model runs with failing tests, the rest of the project is stopped and the failure is reported. This is in contrast to dbt’s default behavior, which runs all models and tests, and then reports all failures at the end.

Cosmos supports the following test behaviors:

  • after_each (default): turns each model into a task group with two steps: run the model, and run the tests

  • after_all: each model becomes a single task, and the tests only run if all models are run successfully

  • none: don’t include tests

Example:

from cosmos import DbtTaskGroup, RenderConfig
from cosmos.constants import TestBehavior

jaffle_shop = DbtTaskGroup(
    render_config=RenderConfig(
        test_behavior=TestBehavior.AFTER_ALL,
    )
)

Warning Behavior#

Note

As of now, this feature is only available for the default execution mode local and for virtualenv

Cosmos enables you to receive warning notifications from tests and process them using a callback function. The on_warning_callback parameter adds two extra context variables to the callback function: test_names and test_results. test_names contains the names of the tests that generated a warning, while test_results holds the corresponding test results at the same index. Both the test_names and test_results variables are lists of strings.

For example, the following code snippet shows how to send a Slack message when a warning occurs:

from cosmos import DbtDag
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.utils.context import Context


def warning_callback_func(context: Context):
    tests = context.get("test_names")
    results = context.get("test_results")

    warning_msgs = ""
    for test, result in zip(tests, results):
        warning_msg = f"""
        *Test*: {test}
        *Result*: {result}
        """
        warning_msgs += warning_msg

    if warning_msgs:
        slack_msg = f"""
        :large_yellow_circle: Airflow-DBT task with WARN.
        *Task*: {context.get('task_instance').task_id}
        *Dag*: {context.get('task_instance').dag_id}
        *Execution Time*: {context.get('execution_date')}
        *Log Url*: {context.get('task_instance').log_url}
        {warning_msgs}
        """

        slack_hook = SlackWebhookHook(slack_webhook_conn_id="slack_conn_id")
        slack_hook.send(text=slack_msg)


mrr_playbook = DbtDag(
    # ...
    on_warning_callback=warning_callback_func,
)

When at least one WARN message is present, the function passed to on_warning_callback will be triggered. In the example above, the following message will be sent to Slack:

https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/callback_slack.png

Note

If warnings that are not associated with tests occur (e.g. freshness warnings), they will still trigger the on_warning_callback method above. However, these warnings will not be included in the test_names and test_results context variables, which are specific to test-related warnings.