Published on 00/00/0000
Last updated on 00/00/0000
Published on 00/00/0000
Last updated on 00/00/0000
Share
Share
PRODUCT
9 min read
Share
#!/bin/bash
## Database Credentials
world_db_host="localhost"
port="3306"
username="demo_user"
password="demo_pass"
world_db="world_x"
printf "Setting up Great Expectations for UMP...\n\n"
printf "Using following Database config:\n"
printf "HOST: %s\n" "$world_db_host"
printf "PORT: %s\n" "$port"
printf "USERNAME: %s\n" "$username"
printf "DATABASE: %s\n" "$world_db"
printf "Remove existing GE dirs, if exists...\n\n"
rm -rf great_expectations
printf "Setting up Python venv...\n\n"
python3 -m venv venv
source ./venv/bin/activate
printf "Installing required GE pip packages...\n\n"
python3 -m pip install --upgrade pip
pip3 install great_expectations
pip3 install pymysql
pip3 install sqlalchemy
pip3 install jupyterlab
pip3 install notebook
printf "\nGE is up with version:\n"
great_expectations --version
printf "Initializing GE and datasource...\n\n"
/usr/bin/expect -c '
spawn great_expectations --v3-api init
expect -re {OK to proceed?} {send "Y\r"}
send "great_expectations --v3-api datasource new\n"
expect -re ":"
send "2\r"
expect -re ":"
send "1\r"
'
ls -l
printf "Setting up MySQL Data Sources...\n\n"
python3 setup_datasources.py world_db_datasource $world_db_host $port $username $password $world_db
printf "Setting up Expectations Suites for world_x DB...\n\n"
python3 setup_expectations.py world_db_city_expectation world_db_datasource city ID,Name,CountryCode,District,Info
python3 setup_expectations.py world_db_country_expectation world_db_datasource country Code,Name,Capital,Code2,Covid_Hotspot
python3 setup_expectations.py world_db_countryinfo_expectation world_db_datasource countryinfo doc,_id,_json_schema
python3 setup_expectations.py world_db_countrylanguage_expectation world_db_datasource countrylanguage CountryCode,Language,IsOfficial,Percentage
printf "Setting up checkpoints for world_x DB...\n\n"
python3 setup_checkpoints.py world_db_city_checkpoint world_db_city_expectation world_db_datasource city
python3 setup_checkpoints.py world_db_country_checkpoint world_db_country_expectation world_db_datasource country
python3 setup_checkpoints.py world_db_countryinfo_checkpoint world_db_countryinfo_expectation world_db_datasource countryinfo
python3 setup_checkpoints.py world_db_countrylanguage_checkpoint world_db_countrylanguage_expectation world_db_datasource countrylanguage
printf "Setting up s3_site locations for GE...\n\n"
sed -i "" "/class_name: DefaultSiteIndexBuilder/r s3_site.txt" great_expectations/great_expectations.yml
sed -i "" "s/site_names: \[\]/site_names: \[\"local_site\"\, \"s3_site\"\]/g" great_expectations/checkpoints/*.yml
printf "Listing all checkpoints...\n\n"
great_expectations checkpoint list
Specifically, this script does the following:
s3_site:
class_name: SiteBuilder
store_backend:
class_name: TupleS3StoreBackend
bucket: greatexpectations-s3
site_index_builder:
class_name: DefaultSiteIndexBuilder
Once setup is complete, you should see the following data_docs sites (local and s3) added in your great_expectations.yml file:
data_docs_sites:
local_site:
class_name: SiteBuilder
show_how_to_buttons: true
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/data_docs/local_site/
site_index_builder:
class_name: DefaultSiteIndexBuilder
s3_site:
class_name: SiteBuilder
store_backend:
class_name: TupleS3StoreBackend
bucket: ump-greatexpectations-s3
site_index_builder:
class_name: DefaultSiteIndexBuilder
Note that this ge.sh script calls additional Python scripts to complete installation. One is setup_datasources.py, which looks like this:
import sys
import great_expectations as ge
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists
context = ge.get_context()
print(sys.argv)
datasource_name = sys.argv[1]
host = sys.argv[2]
port = sys.argv[3]
username = sys.argv[4]
password = sys.argv[5]
database = sys.argv[6]
example_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
credentials:
host: {host}
port: '{port}'
username: {username}
password: {password}
database: {database}
drivername: mysql+pymysql
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
name: whole_table"""
print(example_yaml)
context.test_yaml_config(yaml_config=example_yaml)
sanitize_yaml_and_save_datasource(context, example_yaml, overwrite_existing=False)
context.list_datasources()
Another is setup_expectations.py:
import sys
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler
import great_expectations as ge
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context.types.resource_identifiers import ExpectationSuiteIdentifier
from great_expectations.exceptions import DataContextError
print(sys.argv)
expectation_suite_name = sys.argv[1]
datasource_name = sys.argv[2]
data_asset_name = sys.argv[3]
ignored_columns = sys.argv[4].split(',')
open_data_docs = sys.argv[5] if len(sys.argv) >= 6 else 'false'
context = ge.data_context.DataContext()
try:
suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
print(
f'Loaded ExpectationSuite "{suite.expectation_suite_name}" containing {len(suite.expectations)} expectations.')
except DataContextError:
suite = context.create_expectation_suite(expectation_suite_name=expectation_suite_name)
print(f'Created ExpectationSuite "{suite.expectation_suite_name}".')
print(context.get_expectation_suite(expectation_suite_name=expectation_suite_name))
context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=expectation_suite_name)
suite_identifier = ExpectationSuiteIdentifier(expectation_suite_name=expectation_suite_name)
context.build_data_docs(resource_identifiers=[suite_identifier])
batch_request = {'datasource_name': datasource_name, 'data_connector_name': 'default_inferred_data_connector_name',
'data_asset_name': data_asset_name, 'limit': 1000}
validator = context.get_validator(
batch_request=BatchRequest(**batch_request),
expectation_suite_name=expectation_suite_name
)
column_names = [f'"{column_name}"' for column_name in validator.columns()]
print(f"Columns: {', '.join(column_names)}.")
validator.head(n_rows=5, fetch_all=False)
profiler = UserConfigurableProfiler(
profile_dataset=validator,
excluded_expectations=None,
ignored_columns=ignored_columns,
not_null_only=False,
primary_or_compound_key=False,
semantic_types_dict=None,
table_expectations_only=False,
value_set_threshold="MANY",
)
suite = profiler.build_suite()
# Additional Expectations
for x in range(len(ignored_columns)):
validator.expect_column_values_to_not_be_null(column=ignored_columns[x])
print(validator.get_expectation_suite(discard_failed_expectations=False))
validator.save_expectation_suite(discard_failed_expectations=False)
checkpoint_config = {
"class_name": "SimpleCheckpoint",
"run_name_template": "%Y%m%d-%H%M%S-" + expectation_suite_name,
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name
}
]
}
checkpoint = SimpleCheckpoint(
f"_tmp_checkpoint_{expectation_suite_name}",
context,
**checkpoint_config
)
checkpoint_result = checkpoint.run()
context.build_data_docs()
validation_result_identifier = checkpoint_result.list_validation_result_identifiers() # [0]
if open_data_docs == 'true':
context.open_data_docs()
Finally, we have setup_checkpoints.py:
from ruamel.yaml import YAML
import great_expectations as ge
from pprint import pprint
import sys
yaml = YAML()
context = ge.get_context()
print(sys.argv)
my_checkpoint_name = sys.argv[1]
expectation_suite_name = sys.argv[2]
datasource_name = sys.argv[3]
data_asset_name = sys.argv[4]
open_data_docs = sys.argv[5] if len(sys.argv) >= 6 else 'false'
yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-{my_checkpoint_name}"
validations:
- batch_request:
datasource_name: {datasource_name}
data_connector_name: default_inferred_data_connector_name
data_asset_name: {data_asset_name}
data_connector_query:
index: -1
expectation_suite_name: {expectation_suite_name}
"""
print(yaml_config)
# Run this cell to print out the names of your Data Sources, Data Connectors and Data Assets
pprint(context.get_available_data_asset_names())
context.list_expectation_suite_names()
my_checkpoint = context.test_yaml_config(yaml_config=yaml_config)
print(my_checkpoint.get_substituted_config().to_yaml_str())
context.add_checkpoint(**yaml.load(yaml_config))
context.run_checkpoint(checkpoint_name=my_checkpoint_name)
if open_data_docs == 'true':
context.open_data_docs()
How to Execute
great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- plugins
|-- .gitignore
|-- uncommitted
|-- config_variables.yml
|-- data_docs
|-- validations
Note: Ensure path for above root directory for great_expectations is set to ENV variable “GE_ROOT_DIR” before proceeding with Airflow setup
#! /bin/bash
# set airflow home as
export AIRFLOW_HOME=$(pwd)/air
export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
printf $AIRFLOW_HOME
printf "Installing required pip packages...\n\n"
pip3 install apache-airflow
pip3 install great_expectations airflow-provider-great-expectations>=0.1.0
pip3 install pymysql
printf "Moving DAG file to airflow dir...\n\n"
pwd
mkdir -p air/dags
cp airflow_greatexpectations.py air/dags/
ls -l air/dags/
printf "Initialize and Start Airflow...\n\n"
airflow dags list
# initialize the database
airflow db init
# provide your user login credentials and save
airflow users create \
--username demo_user \
--firstname airflowr \
--password demo_pass \
--lastname airflowr\
--role Admin \
--email demo123@gmail.com
# start the web server and scheduler
airflow webserver --port 8080 -D
airflow scheduler
This is a script (airflow_greatexpectations.py) to execute great_expectations checkpoints with Airflow DAGs using GreatExpectationsOperator:
import datetime
import logging
import os
from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
# Set ENV variable(GE_ROOT_DIR) path to absolute path of great_expectations directory on your machine
ge_root_dir = os.environ.get('GE_ROOT_DIR')
print("GE_ROOT_DIR:", ge_root_dir)
def on_failure_func(context):
"""Define custom failure notification behavior"""
dag_run = context.get('dag_run')
task_instances = dag_run.get_task_instances()
print("WARNING: These task instances failed which can cause data consistencies", task_instances)
logging.error('WARNING: These task instances failed which can cause data consistencies: {}'.format(task_instances))
with DAG(
dag_id="great_expectations_dag",
start_date=datetime.datetime(2022, 1, 19),
on_failure_callback=on_failure_func,
catchup=False,
schedule_interval=None
) as dag:
ge_world_db_city_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_city_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_city_checkpoint"
)
ge_world_db_country_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_country_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_country_checkpoint",
trigger_rule="all_done"
)
ge_world_db_countryinfo_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_countryinfo_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_countryinfo_checkpoint",
trigger_rule="all_done"
)
ge_world_db_countrylanguage_checkpoint_pass = GreatExpectationsOperator(
task_id="task_world_db_countrylanguage_checkpoint",
data_context_root_dir=ge_root_dir,
checkpoint_name="world_db_countrylanguage_checkpoint",
trigger_rule="all_done"
)
ge_world_db_city_checkpoint_pass >> ge_world_db_country_checkpoint_pass >> ge_world_db_countryinfo_checkpoint_pass \
>> ge_world_db_countrylanguage_checkpoint_pass
Now, run the airflowinstall.sh script to complete Airflow setup.
Get emerging insights on emerging technology straight to your inbox.
Discover why security teams rely on Panoptica's graph-based technology to navigate and prioritize risks across multi-cloud landscapes, enhancing accuracy and resilience in safeguarding diverse ecosystems.
The Shift is Outshift’s exclusive newsletter.
Get the latest news and updates on cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations shaping the future of technology.