Commit 3e36abf3 authored by Dave Wentzel's avatar Dave Wentzel

edits

parent 6aeea11a
# Databricks notebook source
from pyspark.sql.functions import *
import datetime
# COMMAND ----------
dbutils.widgets.text("container", "")
dbutils.widgets.text("file_type", "csv")
dbutils.widgets.text("source_name", "")
container = dbutils.widgets.get("container")
file_type = dbutils.widgets.get("file_type")
source_name = dbutils.widgets.get("source_name")
account_name = dbutils.secrets.get("UserScope", source_name + "_storagename")
account_key = dbutils.secrets.get("UserScope", source_name + "_storagekey")
# COMMAND ----------
try:
dbutils.fs.ls("/mnt/user/blob/" + account_name + "/" + container)
print("Mount already exists! No need to mount. We only mount on the first run or if it gets deleted.")
except:
dbutils.fs.mount(
source = "wasbs://"+container+"@"+account_name+".blob.core.windows.net",
mount_point = "/mnt/user/blob/" + account_name + "/" + container,
extra_configs = {"fs.azure.account.key."+account_name+".blob.core.windows.net": account_key})
# COMMAND ----------
container_path = "/mnt/user/blob/" + account_name + "/" + container
# COMMAND ----------
timeNow = datetime.datetime.now()
current_datetime = timeNow.strftime("%Y") + timeNow.strftime("%m") + timeNow.strftime("%d") + timeNow.strftime("%H") + timeNow.strftime("%M") + timeNow.strftime("%S")
if file_type == "csv":
data = sqlContext.read.format("csv").options(header="true", inferSchema="true").load(container_path + "/*.csv")
output_file_name = account_name + "_" + container + "_" + current_datetime
path_to_write = "/mnt/system/datalake/raw/" + source_name + "/" + timeNow.strftime("%Y") + "/" + timeNow.strftime("%m") + "/" + timeNow.strftime("%d") + "/" + timeNow.strftime("%H") + "/" + timeNow.strftime("%M") + "/" + output_file_name
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
data.write.mode("overwrite").parquet(path_to_write)
print("DELETING processed files...")
data = data.withColumn("filename",input_file_name())
# collect file names
processed_files = data.select("filename").distinct().collect()
# process names
list_to_delete = []
for f in processed_files:
list_to_delete.append(f.filename.replace("dbfs:", ""))
dbutils.fs.rm(*list_to_delete, True)
print("processed all csv files in blob.")
# COMMAND ----------
timeNow = datetime.datetime.now()
current_datetime = timeNow.strftime("%Y") + timeNow.strftime("%m") + timeNow.strftime("%d") + timeNow.strftime("%H") + timeNow.strftime("%M") + timeNow.strftime("%S")
if file_type == "parquet":
data = spark.read.parquet(container_path + "/*.parquet")
output_file_name = account_name + "_" + container + "_" + current_datetime
path_to_write = "/mnt/system/datalake/raw/" + source_name + "/" + timeNow.strftime("%Y") + "/" + timeNow.strftime("%m") + "/" + timeNow.strftime("%d") + "/" + timeNow.strftime("%H") + "/" + timeNow.strftime("%M") + "/" + output_file_name
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
data.write.mode("overwrite").parquet(path_to_write)
print("DELETING processed files...")
data = data.withColumn("filename",input_file_name())
# collect file names
processed_files = data.select("filename").distinct().collect()
# process names
list_to_delete = []
for f in processed_files:
list_to_delete.append(f.filename.replace("dbfs:", ""))
dbutils.fs.rm(*list_to_delete, True)
print("processed all parquet files in blob.")
\ No newline at end of file
# Databricks notebook source
import datetime
accountName = dbutils.secrets.get(scope = "SystemScope", key = "storagename")
accountKey = dbutils.secrets.get(scope = "SystemScope", key = "storagekey")
# COMMAND ----------
# MAGIC %run /Shared/Connectors/Functions
# COMMAND ----------
dbutils.widgets.text("server", "")
dbutils.widgets.text("database", "")
dbutils.widgets.text("port", "1433")
dbutils.widgets.text("source_name", "")
dbutils.widgets.text("get_views", "1")
server = dbutils.widgets.get("server")
database = dbutils.widgets.get("database")
port = dbutils.widgets.get("port")
source_name = dbutils.widgets.get("source_name")#this is the output source name.
get_views = dbutils.widgets.get("get_views")
username = dbutils.secrets.get("UserScope", source_name + "_username")
password = dbutils.secrets.get("UserScope", source_name + "_password")
# COMMAND ----------
jdbcUrl = "jdbc:sqlserver://" + server + ":" + port + ";database=" + database
connectionProperties = {
"user": username,
"password": password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
# COMMAND ----------
#Note: The parentheses are required.
table_query = "(select *, CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TableName from INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='" + database + "' ) table_alias"
tables = spark.read.jdbc(url=jdbcUrl, table=table_query, properties=connectionProperties)
view_query = "(select *, CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as ViewName from INFORMATION_SCHEMA.VIEWS WHERE TABLE_CATALOG='" + database + "' and TABLE_SCHEMA != 'sys') view_alias"
views = spark.read.jdbc(url=jdbcUrl, table=view_query, properties=connectionProperties)
# COMMAND ----------
table_arr = tables.select("TableName").distinct().collect()
view_arr = views.select("ViewName").distinct().collect()
# COMMAND ----------
table_list = []
for t in table_arr:
table_list.append(t.TableName)
if get_views == "1":
for v in view_arr:
table_list.append(v.ViewName)
# COMMAND ----------
# same datetime for all tables in a single job
timeNow = datetime.datetime.now()
current_datetime = timeNow.strftime("%Y") + timeNow.strftime("%m") + timeNow.strftime("%d") + timeNow.strftime("%H") + timeNow.strftime("%M") + timeNow.strftime("%S")
# COMMAND ----------
# loop through the table list
for tab in table_list:
table_to_load = tab
print(str(datetime.datetime.now()) + ": Starting to load object: " + table_to_load)
# format file name
output_file_name = database + "_" + table_to_load + "_" + current_datetime
# format output path
path_to_write = "/mnt/system/datalake/raw/" + source_name + "/" + timeNow.strftime("%Y") + "/" + timeNow.strftime("%m") + "/" + timeNow.strftime("%d") + "/" + timeNow.strftime("%H") + "/" + timeNow.strftime("%M") + "/" + output_file_name
# watermark entity
entity = getWatermark(accountName, accountKey, database, table_to_load)
# if there is a watermark present
if entity != False:
watermark = entity.Watermark
if entity.Watermark is None:
watermark = 0
# read data
query = "(select * from " + table_to_load + " where " + entity.Column + " > '" + str(watermark) + "') table_alias"
data = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
# no spaces in col names
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
# write data if there is greater than 0 rows
if data.count() > 0 :
data.write.parquet(path_to_write, mode="overwrite")
newWatermark = data.agg({entity.Column: "max"}).collect()[0]["max("+entity.Column+")"]
updateWatermark(accountName, accountKey, entity.PartitionKey, entity.RowKey, entity.Column, entity.Server, newWatermark)
else :
# read data
data = spark.read.jdbc(url=jdbcUrl, table=table_to_load, properties=connectionProperties)
# no spaces in col names
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
# write data if there is greater than 0 rows
if data.count() > 0 :
data.write.parquet(path_to_write, mode="overwrite")
print(str(datetime.datetime.now()) + ": Wrote file to: '" + path_to_write + "'")
# COMMAND ----------
# Databricks notebook source
import datetime
accountName = dbutils.secrets.get(scope = "SystemScope", key = "storagename")
accountKey = dbutils.secrets.get(scope = "SystemScope", key = "storagekey")
# COMMAND ----------
# MAGIC %run /Shared/Connectors/Functions
# COMMAND ----------
dbutils.widgets.text("server", "")
dbutils.widgets.text("database", "")
dbutils.widgets.text("port", "1433")
dbutils.widgets.text("source_name", "")
dbutils.widgets.text("get_views", "1")
server = dbutils.widgets.get("server")
database = dbutils.widgets.get("database")
port = dbutils.widgets.get("port")
source_name = dbutils.widgets.get("source_name")#this is the output source name.
get_views = dbutils.widgets.get("get_views")
username = dbutils.secrets.get("UserScope", source_name + "_username")
password = dbutils.secrets.get("UserScope", source_name + "_password")
# COMMAND ----------
jdbcUrl = "jdbc:sqlserver://" + server + ":" + port + ";database=" + database
connectionProperties = {
"user": username,
"password": password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
# COMMAND ----------
#Note: The parentheses are required.
table_query = "(select *, CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TableName from INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='" + database + "' ) table_alias"
tables = spark.read.jdbc(url=jdbcUrl, table=table_query, properties=connectionProperties)
view_query = "(select *, CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as ViewName from INFORMATION_SCHEMA.VIEWS WHERE TABLE_CATALOG='" + database + "' and TABLE_SCHEMA != 'sys') view_alias"
views = spark.read.jdbc(url=jdbcUrl, table=view_query, properties=connectionProperties)
# COMMAND ----------
table_arr = tables.select("TableName").distinct().collect()
view_arr = views.select("ViewName").distinct().collect()
# COMMAND ----------
table_list = []
for t in table_arr:
table_list.append(t.TableName)
if get_views == "1":
for v in view_arr:
table_list.append(v.ViewName)
# COMMAND ----------
# same datetime for all tables in a single job
timeNow = datetime.datetime.now()
current_datetime = timeNow.strftime("%Y") + timeNow.strftime("%m") + timeNow.strftime("%d") + timeNow.strftime("%H") + timeNow.strftime("%M") + timeNow.strftime("%S")
# COMMAND ----------
# loop through the table list
for tab in table_list:
table_to_load = tab
print(str(datetime.datetime.now()) + ": Starting to load object: " + table_to_load)
# format file name
output_file_name = database + "_" + table_to_load + "_" + current_datetime
# format output path
path_to_write = "/mnt/system/datalake/raw/" + source_name + "/" + timeNow.strftime("%Y") + "/" + timeNow.strftime("%m") + "/" + timeNow.strftime("%d") + "/" + timeNow.strftime("%H") + "/" + timeNow.strftime("%M") + "/" + output_file_name
# watermark entity
entity = getWatermark(accountName, accountKey, database, table_to_load)
# if there is a watermark present
if entity != False:
watermark = entity.Watermark
if entity.Watermark is None:
watermark = 0
# read data
query = "(select * from " + table_to_load + " where " + entity.Column + " > '" + str(watermark) + "') table_alias"
data = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
# no spaces in col names
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
# write data if there is greater than 0 rows
if data.count() > 0 :
data.write.parquet(path_to_write, mode="overwrite")
newWatermark = data.agg({entity.Column: "max"}).collect()[0]["max("+entity.Column+")"]
updateWatermark(accountName, accountKey, entity.PartitionKey, entity.RowKey, entity.Column, entity.Server, newWatermark)
else :
# read data
data = spark.read.jdbc(url=jdbcUrl, table=table_to_load, properties=connectionProperties)
# no spaces in col names
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
# write data if there is greater than 0 rows
if data.count() > 0 :
data.write.parquet(path_to_write, mode="overwrite")
print(str(datetime.datetime.now()) + ": Wrote file to: '" + path_to_write + "'")
# COMMAND ----------
# Databricks notebook source
dbutils.widgets.text("Datalake", "")
datalake = "adl://" + dbutils.widgets.get("Datalake") + ".azuredatalakestore.net/"
MountPoint = "/mnt/system/datalake/"
# COMMAND ----------
ClientId = dbutils.secrets.get(scope = "SystemScope", key = "clientid")
ClientSecret = dbutils.secrets.get(scope = "SystemScope", key = "clientsecret")
TenantId = dbutils.secrets.get(scope = "SystemScope", key = "tenantid")
account_name = dbutils.secrets.get(scope = "SystemScope", key = "storagename")
key = dbutils.secrets.get(scope = "SystemScope", key = "storagekey")
# COMMAND ----------
configs = {"dfs.adls.oauth2.access.token.provider.type": "ClientCredential",
"dfs.adls.oauth2.client.id": ClientId,
"dfs.adls.oauth2.credential": ClientSecret,
"dfs.adls.oauth2.refresh.url": "https://login.microsoftonline.com/"+ TenantId +"/oauth2/token"}
dbutils.fs.mount(
source = datalake,
mount_point = MountPoint,
extra_configs = configs)
# COMMAND ----------
print("Installing dependecies below....")
# COMMAND ----------
# MAGIC %sh /home/ubuntu/databricks/python/bin/pip install azure
# COMMAND ----------
## Configuring the blob storage mounts
## import container for file uploads
try:
dbutils.fs.ls("/mnt/import")
print("Mount already exists!")
except:
dbutils.fs.mount(
source = "wasbs://import@"+account_name+".blob.core.windows.net",
mount_point = "/mnt/system/import",
extra_configs = {"fs.azure.account.key."+account_name+".blob.core.windows.net": key})
# COMMAND ----------
## Configuring the blob storage mounts
## export container for file downloads
try:
dbutils.fs.ls("/mnt/export")
print("Mount already exists!")
except:
dbutils.fs.mount(
source = "wasbs://export@"+account_name+".blob.core.windows.net",
mount_point = "/mnt/system/export",
extra_configs = {"fs.azure.account.key."+account_name+".blob.core.windows.net": key})
# COMMAND ----------
# Databricks notebook source
# MAGIC %sh /home/ubuntu/databricks/python/bin/pip install azure
# COMMAND ----------
from azure.cosmosdb.table.tableservice import TableService
from azure.cosmosdb.table.models import Entity
# COMMAND ----------
def getWatermark(accountName, accountKey, database, table):
try:
table_service = TableService(account_name=accountName, account_key=accountKey)
task = table_service.get_entity('watermarkTable', database, table)
return task
except:
return False
# COMMAND ----------
def updateWatermark(accountName, accountKey, database, table, column, server, newWatermark):
try :
table_service = TableService(account_name=accountName, account_key=accountKey)
updateTask = {'PartitionKey': database, 'RowKey': table, 'Column' : column, 'Server' : server, 'Watermark': newWatermark}
table_service.update_entity('watermarkTable', updateTask)
return True
except:
return False
\ No newline at end of file
# Databricks notebook source
# MAGIC %sh /home/ubuntu/databricks/python/bin/pip install azure
# COMMAND ----------
from azure.cosmosdb.table.tableservice import TableService
from azure.cosmosdb.table.tablebatch import TableBatch
from azure.cosmosdb.table.models import Entity
from pyspark.sql.functions import *
from pyspark.sql.types import *
# COMMAND ----------
dbutils.widgets.text("BlobJsonFileName", "SourceKeysTest.json")
file = "/mnt/system/import/" + dbutils.widgets.get("BlobJsonFileName")
accountName = dbutils.secrets.get(scope = "SystemScope", key = "storagename")
accountKey = dbutils.secrets.get(scope = "SystemScope", key = "storagekey")
# COMMAND ----------
# read file
df = spark.read.option("multiline", "true").json(file)
# COMMAND ----------
# explode to dataframe
df = df.withColumn("Elements", explode(col("Items"))).withColumn("ColumnName", col("Elements.ColumnName")).withColumn("TableName", col("Elements.TableName")).drop("Items").drop("Elements")
# COMMAND ----------
# add watermark column
df = df.withColumn("Watermark", lit(""))
# COMMAND ----------
# create client and table if it does not exist
table_service = TableService(account_name=accountName, account_key=accountKey)
table_service.create_table("watermarkTable")
# COMMAND ----------
# collect the data frame rows
dictionary = df.collect()
# COMMAND ----------
# convert the rows to a list of dictionary objects
df_dict = [{"PartitionKey": r['DatabaseLevelName'], "RowKey": r['TableName'], "Server": r['ServerLevelName'], "Column": r['ColumnName'], "Watermark": r['Watermark']} for r in dictionary]
# COMMAND ----------
# for each element we will loop through and add to the table
for d in df_dict:
table_service.insert_entity("watermarkTable", d)
# COMMAND ----------
# Databricks notebook source
import datetime
accountName = dbutils.secrets.get(scope = "SystemScope", key = "storagename")
accountKey = dbutils.secrets.get(scope = "SystemScope", key = "storagekey")
# COMMAND ----------
# MAGIC %run /Shared/Connectors/Functions
# COMMAND ----------
dbutils.widgets.text("server", "")
dbutils.widgets.text("database", "")
dbutils.widgets.text("port", "1433")
dbutils.widgets.text("source_name", "")
dbutils.widgets.text("get_views", "1")
server = dbutils.widgets.get("server")
database = dbutils.widgets.get("database")
port = dbutils.widgets.get("port")
source_name = dbutils.widgets.get("source_name")#this is the output source name.
get_views = dbutils.widgets.get("get_views")
username = dbutils.secrets.get("UserScope", source_name + "_username")
password = dbutils.secrets.get("UserScope", source_name + "_password")
# COMMAND ----------
jdbcUrl = "jdbc:sqlserver://" + server + ":" + port + ";database=" + database
connectionProperties = {
"user": username,
"password": password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
# COMMAND ----------
#Note: The parentheses are required.
table_query = "(select *, CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TableName from INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='" + database + "' ) table_alias"
tables = spark.read.jdbc(url=jdbcUrl, table=table_query, properties=connectionProperties)
view_query = "(select *, CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as ViewName from INFORMATION_SCHEMA.VIEWS WHERE TABLE_CATALOG='" + database + "' and TABLE_SCHEMA != 'sys') view_alias"
views = spark.read.jdbc(url=jdbcUrl, table=view_query, properties=connectionProperties)
# COMMAND ----------
table_arr = tables.select("TableName").distinct().collect()
view_arr = views.select("ViewName").distinct().collect()
# COMMAND ----------
table_list = []
for t in table_arr:
table_list.append(t.TableName)
if get_views == "1":
for v in view_arr:
table_list.append(v.ViewName)
# COMMAND ----------
# same datetime for all tables in a single job
timeNow = datetime.datetime.now()
current_datetime = timeNow.strftime("%Y") + timeNow.strftime("%m") + timeNow.strftime("%d") + timeNow.strftime("%H") + timeNow.strftime("%M") + timeNow.strftime("%S")
# COMMAND ----------
# loop through the table list
for tab in table_list:
table_to_load = tab
print(str(datetime.datetime.now()) + ": Starting to load object: " + table_to_load)
# format file name
output_file_name = database + "_" + table_to_load + "_" + current_datetime
# format output path
path_to_write = "/mnt/system/datalake/raw/" + source_name + "/" + timeNow.strftime("%Y") + "/" + timeNow.strftime("%m") + "/" + timeNow.strftime("%d") + "/" + timeNow.strftime("%H") + "/" + timeNow.strftime("%M") + "/" + output_file_name
# watermark entity
entity = getWatermark(accountName, accountKey, database, table_to_load)
# if there is a watermark present
if entity != False:
watermark = entity.Watermark
if entity.Watermark is None:
watermark = 0
# read data
query = "(select * from " + table_to_load + " where " + entity.Column + " > '" + str(watermark) + "') table_alias"
data = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
# no spaces in col names
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
# write data if there is greater than 0 rows
if data.count() > 0 :
data.write.parquet(path_to_write, mode="overwrite")
newWatermark = data.agg({entity.Column: "max"}).collect()[0]["max("+entity.Column+")"]
updateWatermark(accountName, accountKey, entity.PartitionKey, entity.RowKey, entity.Column, entity.Server, newWatermark)
else :
# read data
data = spark.read.jdbc(url=jdbcUrl, table=table_to_load, properties=connectionProperties)
# no spaces in col names
for col in data.columns:
data = data.withColumnRenamed(col, col.replace(" ", ""))
# write data if there is greater than 0 rows
if data.count() > 0 :
data.write.parquet(path_to_write, mode="overwrite")
print(str(datetime.datetime.now()) + ": Wrote file to: '" + path_to_write + "'")
# COMMAND ----------
delta: versioned parquet files
handles upserts, but must use MERGE INTO syntax
WORM
compaction: run OPTIMIZE
quiesce the system and run VACUUM
VACUUM events RETAIN 24 HOURS
OPTIMIZE events ZORDER BY eventType, city
multi-dimensional clustering
OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)
“CREATE TABLE ... USING parquet” to
“CREATE TABLE ... USING delta”
“dataframe.write.format(“parquet”).load(“/data/events”)”
“dataframe.write.format(“delta”).load(“/data/events”)”
display(spark.sql("DESCRIBE HISTORY quickstart_delta"))
Only when you are working with a HiveContext can DataFrames be saved as persistent tables; DataFrame sources from a SQLContext cannot be saved as Hive tables. Here is an example of DataFrame created from an existing RDD; note that the DataFrame was created using the HiveContext: val flightsDF = hiveContext.createDataFrame(resultRDD) flightsDF.write.saveAsTable("FlightDelaysSummaryRDD")
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment