Dagster Integration
This connector supports extracting:
- Dagster Pipeline and Task Metadata
- Pipeline Run Status
- Table Lineage
from Dagster.
Using DataHub's Dagster Sensor
Dagster Sensors allow us to perform actions when important events occur in Dagster. DataHub's Dagster Sensor allows you to emit metadata after every Dagster pipeline run. This sensor can emit Pipelines, Tasks, and run results. For more details about Dagster Sensors, please refer to the documentation.
Prerequisites
- Create a Dagster project. See Create New Project.
- Create a Definitions class or Repositories.
- The creation of a new Dagster project via the UI uses the Definition class to define Dagster pipelines.
Setup
Install the DataHub Dagster plugin package:
pip install acryl_datahub_dagster_plugin
Import the DataHub Dagster Sensor, which is provided in the plugin package, to your Dagster Definition or Repository before starting the Dagster UI:
Example Definitions Class:
{{ inline /metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py }}
The DataHub Dagster plugin-provided sensor internally uses the following configurations. You can override the default config values using environment variables.
Configuration options:
Configuration Option Default Value Description datahub_client_config The DataHub client config dagster_url The URL to your Dagster Webserver. capture_asset_materialization True Whether to capture asset keys as DataHub Datasets on AssetMaterialization event capture_input_output True Whether to capture and try to parse input and output from HANDLED_OUTPUT, LOADED_INPUT events. (currently only PathMetadataValue metadata supported (EXPERIMENTAL) platform_instance The instance of the platform that all assets produced by this recipe belong to. It is optional asset_lineage_extractor You can implement your own logic to capture asset lineage information. See example for details[] enable_asset_query_metadata_parsing Whether to enable parsing query from asset metadata. See below for details[] Once the Dagster UI is running, turn on the provided Sensor execution. To turn on the Sensor, click on the Overview tab and then on the Sensors tab. Simply toggle the DataHub sensor on.
Woohoo! Now, the DataHub Sensor is ready to emit metadata after every pipeline run.
How to Validate Installation
Navigate to the Dagster UI.
Go to Overview > Sensors and look for
datahub_sensor
.Start a Dagster Job. In the daemon logs, you should see DataHub-related log messages:
datahub_sensor - Emitting metadata...
This means that DataHub's sensor is correctly configured to emit metadata to DataHub.
Capturing Table Lineage
There are a few ways to extract lineage, or relationships between tables, from Dagster. We recommend one or more of the following approaches to extract lineage automatically.
But First: Extracting Asset Identifiers
When naming Dagster Assets, we recommend the following structure:
key_prefix=["env", "platform", "db_name", "schema_name"]
This ensures that we correctly resolve the Asset name to a Dataset URN in DataHub.
For example:
@asset(
key_prefix=["prod", "snowflake", "db_name", "schema_name"], # the fqdn asset name to be able to identify platform and make sure the asset is unique
deps=[iris_dataset],
)
If you properly name your Dagster Asset, you can establish a connection between the Asset and the dataset it is referring to, which is likely already stored in DataHub. This allows for accurate tracking and lineage information in the next steps.
If you follow a different naming convention, you can create your own asset_keys_to_dataset_urn_converter
logic and set a custom callback function. This can be used to generate a DataHub Dataset URN in any way you please, from metadata or otherwise.
Here is an example that can create a DataHub URN from the Asset key naming convention specified above:
def asset_keys_to_dataset_urn_converter(
self, asset_key: Sequence[str]
) -> Optional[DatasetUrn]:
"""
Convert asset key to dataset urn
By default, we assume the following asset key structure:
key_prefix=["prod", "snowflake", "db_name", "schema_name"]
"""
if len(asset_key) >= 3:
return DatasetUrn(
platform=asset_key[1],
env=asset_key[0],
name=".".join(asset_key[2:]),
)
else:
return None
Extracting Lineage from SQL Queries
DataHub's Dagster integration can automatically detect dataset inputs and outputs for Software Defined Assets by analyzing the SQL queries it executes. To enable this feature, simply add the executed query to the Asset Metadata using the Query
tag.
Here's an example of a Software Defined Asset with an annotated Query:
@asset(key_prefix=["prod", "snowflake", "db_name", "schema_name"])
def my_asset_table_a(snowflake: SnowflakeResource) -> MaterializeResult:
query = """
create or replace table db_name.schema_name.my_asset_table_a as (
SELECT *
FROM db_name.schema_name.my_asset_table_b
);
"""
with snowflake.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(query)
return MaterializeResult(
metadata={
"Query": MetadataValue.text(query),
}
)
In this example, the plugin will automatically identify and set the upstream lineage as db_name.schema_name.my_asset_table_b
.
Note: Proper asset naming is crucial, as the query parser determines the query language from the generated URN. In the example above, it will be snowflake
.
For a complete example job, refer to the iris.py file in the DataHub repository.
Enhanced SnowflakePandasIOManager
The plugin offers an enhanced version of SnowflakePandasIOManager called DataHubSnowflakePandasIOManager
. This version automatically captures Snowflake assets created by the IO manager and adds DataHub URN and links to the assets in Dagster.
To use it, simply replace SnowflakePandasIOManager
with DataHubSnowflakePandasIOManager
. The enhanced version accepts two additional parameters:
datahub_base_url
: The base URL of the DataHub UI, used to generate direct links to Snowflake Datasets in DataHub. If not set, no URL will be generated.datahub_env
: The DataHub environment to use when generating URNs. Defaults toPROD
if not specified.
Example usage:
from datahub_dagster_plugin.modules.snowflake_pandas.datahub_snowflake_pandas_io_manager import (
DataHubSnowflakePandasIOManager,
)
# ...
resources={
"snowflake_io_manager": DataHubSnowflakePandasIOManager(
database="MY_DB",
account="my_snowflake_account",
warehouse="MY_WAREHOUSE",
user="my_user",
password="my_password",
role="my_role",
datahub_base_url="http://localhost:9002",
),
}
Using Dagster Ins and Out
We can provide inputs and outputs to both Assets and Ops explicitly using a dictionary of Ins
and Out
corresponding to the decorated function arguments. While providing inputs and outputs explicitly, we can provide metadata as well.
To create dataset upstream and downstream dependency for the Assets and Ops, you can use an ins and out dictionary with metadata provided. For reference, look at the sample jobs created using assets assets_job.py
, or ops ops_job.py
.
Using Custom Logic for Extracting Lineage
You can define your own logic to capture asset lineage information.
The output Tuple contains two dictionaries, one for input assets and the other for output assets. The key of the dictionary is the op key and the value is the set of asset URNs that are upstream or downstream of the op.
from datahub_dagster_plugin.client.dagster_generator import DagsterGenerator, DatasetLineage
def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Dict[str, DatasetLineage]:
dataset_lineage: Dict[str, DatasetLineage] = {}
# Extracting input and output assets from the context
return dataset_lineage
Debugging
Connection Error for DataHub Rest URL
If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080)
, then in that case your DataHub GMS service is not up.