## A Unified Logger Solution for Databricks
* wip
* this is a "start" of a composable architecture framework for databricks.
* this code would be part of an `includes` notebook
* search for `DataEngineeringFramework`, this will include the variables that would need to change
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, TimestampType, BooleanType, ShortType
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException
import logging
import pandas as pd
import sys
import re
import json
import datetime
import random
logger = logging.getLogger('DataEngineeringFramework')
class DataEngineeringFramework:
def __init__(self, storage_account='', instrumentation_key='', salt='', logging_level=logging.DEBUG):
if storage_account:
# this would be our datalake
self.storage_account = storage_account
self.serverless_sql_endpoint = mssparkutils.env.getWorkspaceName() + ''
self._initialize_logger(instrumentation_key, logging_level)
self.salt = salt
# in this example we keep our stages (dl zones) in separate containers in the lake.
# it might be better if these were folders under a single lake
# p = pseudonymized data: data where we don't have PII
# np = nonpseudonymized data: data where we have PII directly in the lake
# we have a link that allows us to lookup the np from p data and then we can do ACLing accordingly
self.stage1np = 'abfss://stage1np@' + self.storage_account + ''
self.stage2np = 'abfss://stage2np@' + self.storage_account + ''
self.stage2p = 'abfss://stage2p@' + self.storage_account + ''
self.stage3np = 'abfss://stage3np@' + self.storage_account + ''
self.stage3p = 'abfss://stage3p@' + self.storage_account + ''
self.framework_path = 'abfss://framework@' + self.storage_account + ''
logger.debug("DataEngineeringFramework initialized.")
def _initialize_logger(self, instrumentation_key, logging_level):
logging.lastResort = None
# the logger will print an error like "ValueError: I/O operation on closed file" because we're trying to have log messages also print to stdout
# and apparently this causes issues on some of the spark executor nodes. The bottom line is that we don't want these logging errors to get printed in the notebook output.
logging.raiseExceptions = False
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def load(self, folder, table, stage=None, data_format='delta'):
""" Loads a dataframe based on the path specified in the given args """
if stage is None: stage = self.stage2p
path = f"{stage}/{folder}/{table}"
df ="{stage}/{folder}/{table}", format=data_format)
return df
except AnalysisException as e:
raise ValueError("Failed to load. Are you sure you have the right path?\nMore info below:\n" + str(e))
def print_stage(self, path):
""" Prints out the highlevel contents of the specified stage."""
msg = path + "\n"
folders = self.get_folders(path)
for folder_name in folders:
entities = self.get_folders(path + '/' + folder_name)
msg += f"{folder_name}: {entities}\n"
def fix_column_names(self, df):
""" Fix column names to satisfy the Parquet naming requirements by substituting invalid characters with an underscore. """
df_with_valid_column_names =[F.col(col).alias(re.sub("[ ,;{}()\n\t=]+", "_", col)) for col in df.columns])
return df_with_valid_column_names
def to_spark_schema(self, schema):#: list[list[str]]):
""" Creates a spark schema from a schema specified in the DataEngineeringFramework schema format.
schemas['Person'] = [['Id','string','hash'],
fields = []
for col_name, dtype, op in schema:
fields.append(StructField(col_name, globals()[dtype.lower().capitalize() + "Type"](), True))
spark_schema = StructType(fields)
return spark_schema
def pseudonymize(self, df, schema): #: list[list[str]]):
""" Performs pseudonymization of the given dataframe based on the provided schema.
For example, if the given df is for an entity called person,
2 dataframes will be returned, one called person that has hashed ids and masked fields,
and one called person_lookup that contains the original person_id, person_id_pseudo,
and the non-masked values for columns marked to be masked."""
df_pseudo = df_lookup = df
for col_name, dtype, op in schema:
if op == "hash-no-lookup" or op == "hnl":
# This means that the lookup can be performed against a different table so no lookup is needed.
df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256)).withColumnRenamed(col_name, col_name + "_pseudonym")
df_lookup = df_lookup.drop(col_name)
elif op == "hash" or op == 'h':
df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256)).withColumnRenamed(col_name, col_name + "_pseudonym")
df_lookup = df_lookup.withColumn(col_name + "_pseudonym", F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256))
elif op == "mask" or op == 'm':
df_pseudo = df_pseudo.withColumn(col_name, F.lit('*'))
elif op == "no-op" or op == 'x':
df_lookup = df_lookup.drop(col_name)
df_pseudo = self.fix_column_names(df_pseudo)
df_lookup = self.fix_column_names(df_lookup)
return (df_pseudo, df_lookup)
# Returns true if the path exists
def path_exists(self, path):
tableExists = False
items =
tableExists = True
except Exception as e:
# This Exception comes as a generic Py4JJavaError that occurs when the path specified is not found.
return tableExists
def ls(self, path):
folders = []
files = []
items =
for item in items:
if item.isFile:
elif item.isDir:
except Exception as e:
logger.warning("[DataEngineeringFramework] Could not peform ls on specified path: " + path + "\nThis may be because the path does not exist.")
return (folders, files)
def print_stage(self, path):
folders = self.get_folders(path)
for folder_name in folders:
entities = self.get_folders(path + '/' + folder_name)
print(f"{folder_name}: {entities}")
# Return the list of folders found in the given path.
def get_folders(self, path):
dirs = []
items =
for item in items:
#print(, item.isDir, item.isFile, item.path, item.size)
if item.isDir:
except Exception as e:
logger.warning("[OEA] Could not get list of folders in specified path: " + path + "\nThis may be because the path does not exist.")
return dirs
# Remove a folder if it exists (defaults to use of recursive removal).
def rm_if_exists(self, path, recursive_remove=True):
mssparkutils.fs.rm(path, recursive_remove)
except Exception as e:
def pop_from_path(self, path):
""" Pops the last arg in a path and returns the path and the last arg as a tuple.
pop_from_path('abfss://') # returns ('abfss://', 'test.csv')
m = re.match(r"(.*)\/([^/]+)", path)
return (,
def parse_source_path(self, path):
""" Parses a path that looks like this: abfss://
and returns a dictionary like this: {'stage_num': '2', 'ss': 'ms_insights'}
Note that it will also return a 'stage_num' of 2 if the path is stage2p - this is by design because the spark db with the s2 prefix will be used for data in stage2 and stage2p.
m = re.match(r".*:\/\/stage(?P<stage_num>\d+)[n]?[p]?@[^/]+\/(?P<ss>[^/]+)", path)
return m.groupdict()
def create_db(self, source_path, source_format='DELTA'):
""" Creates a spark db based on the given path (assumes that every folder in the given path is a table).
Note that a spark db that points to source data in the delta format can't be queried via SQL serverless pool. More info here:
source_info = self.parse_source_path(source_path)
db_name = f"s{source_info['stage_num']}_{source_info['ss']}"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
dirs = self.get_folders(source_path)
for table_name in dirs:
spark.sql(f"create table if not exists {db_name}.{table_name} using {source_format} location '{source_path}/{table_name}'")
result = "Database created: " + db_name
return result
def drop_db(self, db_name):
""" Drop all tables in a db, then drop the db. """
df = spark.sql('SHOW TABLES FROM ' + db_name)
for row in df.rdd.collect():
spark.sql(f"DROP TABLE IF EXISTS {db_name}.{row['tableName']}")
spark.sql(f"DROP DATABASE {db_name}")
result = "Database dropped: " + db_name
return result
# List installed packages
def list_packages(self):
import pkg_resources
for d in pkg_resources.working_set:
def print_schema_starter(self, entity_name, df):
""" Prints a starter schema that can be modified as needed when developing the DataEngineeringFramework schema for a new module. """
st = f"self.schemas['{entity_name}'] = ["
for col in df.schema:
st += f"['{}', '{str(col.dataType)[:-4].lower()}', 'no-op'],\n\t\t\t\t\t\t\t\t\t"
return st[:-11] + ']'
def write_dict_as_csv(data, folder, filename, container=None):
""" Writes a dictionary as a csv to the specified location. This is helpful when creating test data sets and landing them in stage1np. """
if container == None: container = self.stage1np
pdf = pd.DataFrame(data)
mssparkutils.fs.put(f"{container}/{folder}/{filename}", pdf.to_csv(index=False), True) # True indicates overwrite mode
class BaseDataEngineeringFrameworkModule:
""" Provides data processing methods for Contoso SIS data (the student information system for the fictional Contoso school district). """
def __init__(self, oea, source_folder, pseudonymize = True):
self.pseudonymize = pseudonymize
self.oea = oea
self.stage1np = f"{DataEngineeringFramework.stage1np}/{source_folder}"
self.stage2np = f"{DataEngineeringFramework.stage2np}/{source_folder}"
self.stage2p = f"{DataEngineeringFramework.stage2p}/{source_folder}"
self.stage3np = f"{DataEngineeringFramework.stage3np}/{source_folder}"
self.stage3p = f"{DataEngineeringFramework.stage3p}/{source_folder}"
self.module_path = f"{DataEngineeringFramework.framework_path}/modules/{source_folder}"
self.schemas = {}
def _process_entity_from_stage1(self, entity_name, format='csv', write_mode='overwrite', header='true'):
spark_schema = self.oea.to_spark_schema(self.schemas[entity_name])
df ="{self.stage1np}/{entity_name}", header=header, schema=spark_schema)
if self.pseudonymize:
df_pseudo, df_lookup = self.oea.pseudonymize(df, self.schemas[entity_name])
if len(df_lookup.columns) > 0:
df = self.oea.fix_column_names(df)
def delete_stage1(self):
def delete_stage2(self):
def delete_stage3(self):
def delete_all_stages(self):
def create_stage2_db(self, format='DELTA'):
self.oea.create_db(self.stage2p, format)
self.oea.create_db(self.stage2np, format)
def create_stage3_db(self, format='DELTA'):
self.oea.create_db(self.stage3p, format)
self.oea.create_db(self.stage3np, format)
def copy_test_data_to_stage1(self):
mssparkutils.fs.cp(self.module_path + '/test_data', self.stage1np, True)
......@@ -179,15 +179,6 @@ wip
* this is also a good example of how notebooks can be used as a presentation tool for executives.
* we use collaborative filtering to predict product ratings
### Azure Data Factory ELT Pipelines
I moved these labs to their own repo.
### Dockerized Data Engineering Pipelines
**This section is not ready. WIP. TODO**
......@@ -233,3 +224,6 @@ Deploy AKS, ACI, and other supporting infrastructure for containers
* [Lab 408: Files Formats and Effects of Compression](./Lab400/408-FileFormats-and-Compression.dbc)
* which file formats are compressible and a demonstration on the impact on performance
### Other Topics
* [Lab 500: Unified Logger](./Lab500/
\ No newline at end of file
