Commit 8dca0255 authored by Dave Wentzel's avatar Dave Wentzel

init

parents
Pipeline #43 failed with stages
/*
enable change tracking on the source database
this script is idempotent
script is written for PRD db. Will need to do a search/replace to fix that below
change retention is set to 10 days, feel free to change it
Search/Replace items:
PRD
@retention_period
*/
USE PRD;
GO
DECLARE @retention_period INT = 10;
DECLARE @exec_str varchar(1000);
IF NOT EXISTS (
SELECT db_name(database_id) , *
FROM sys.change_tracking_databases
)
BEGIN
SELECT @exec_str = 'ALTER DATABASE PRD SET CHANGE_TRACKING = ON (CHANGE_RETENTION = ' + convert(varchar(100),@retention_period) + ' DAYS, AUTO_CLEANUP = ON)'
EXEC (@exec_str);
END;
IF NOT EXISTS (
SELECT db_name(database_id) , *
FROM sys.change_tracking_databases
WHERE retention_period = @retention_period
)
BEGIN
SELECT @exec_str = 'ALTER DATABASE PRD SET CHANGE_TRACKING (CHANGE_RETENTION = ' + convert(varchar(100),@retention_period) + ' DAYS)'
EXEC (@exec_str);
END;
GO
IF NOT EXISTS (select * from sys.schemas where name = 'STREAM')
BEGIN
EXEC ('CREATE SCHEMA STREAM');
END;
GO
IF NOT EXISTS (select * from sys.objects where object_id = object_id('STREAM.CTTABLES'))
BEGIN
CREATE TABLE STREAM.CTTABLES (
schemaname varchar(100),
tblname varchar(100),
is_enabled bit NOT NULL,
LastSyncVersion BIGINT,
NextSyncVersion BIGINT,
FullPullQuery varchar(max) NULL,
IncrementalQuery varchar(max) NULL
);
END;
GO
/*
enable change tracking on the tables
this script is idempotent
run it in the context of the correct db
add any new tables you need to STREAM.CTTABLES in code below and rerun the script
"removes" are not handled, yet
*/
--USE PRD;
GO
SET NOCOUNT ON
GO
--reset table
DELETE FROM STREAM.CTTABLES;
--insert new/removed tables here
--setting LastSyncVersion to -1 means "start over with a full table pull"
INSERT INTO STREAM.CTTABLES (schemaname, tblname, is_enabled, LastSyncVersion, NextSyncVersion)VALUES ('dbo','table', 1, -1, -1);
DECLARE @exec_str varchar(1000);
--add new tables to CT process
DECLARE curAdds CURSOR FOR
SELECT 'ALTER TABLE ' + ctt.schemaname + '.' + ctt.tblname + ' ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = OFF);'
FROM STREAM.CTTABLES ctt
LEFT JOIN sys.change_tracking_tables sctt
ON object_id(ctt.schemaname + '.' + ctt.tblname) = sctt.object_id
WHERE ctt.is_enabled = 1
AND sctt.object_id IS NULL;
OPEN curAdds
FETCH NEXT FROM curAdds INTO @exec_str;
WHILE (@@FETCH_STATUS = 0)
BEGIN
EXEC (@exec_str);
FETCH NEXT FROM curAdds INTO @exec_str;
END;
CLOSE curAdds;
DEALLOCATE curAdds;
/*
SELECT OBJECT_NAME (object_id), *
FROM sys.change_tracking_tables
SELECT * FROM STREAM.CTTABLES;
*/
/*
adds the sync queries to STREAM.CTTABLES
these are called by STREAM.GetLatestTableData
*/
--USE PRD;
GO
SET NOCOUNT ON
GO
DECLARE @FullPullQuery varchar(max), @IncrementalQuery varchar(max), @FilterQuery varchar(max);
SELECT @FullPullQuery = 'SELECT * FROM %TABLE% ';
--reset table
UPDATE STREAM.CTTABLES SET
FullPullQuery = NULL,
IncrementalQuery = NULL
;
--KNA1
SELECT @IncrementalQuery = '
SELECT base.*
FROM %TABLE% base
JOIN (
SELECT DISTINCT
MANDT, KUNNR
FROM CHANGETABLE (CHANGES %TABLE%, %LastSyncVersion%) chgs
WHERE chgs.SYS_CHANGE_OPERATION IN (''I'',''U'')
) chgs
ON base.MANDT = chgs.MANDT
AND base.KUNNR = chgs.KUNNR;
';
UPDATE STREAM.CTTABLES SET
FullPullQuery = @FullPullQuery
,IncrementalQuery = @IncrementalQuery
WHERE schemaname = 'dbo' AND tblname = 'KNA1';
[FunctionName("DatabaseCleanup")]
public static async Task Run([TimerTrigger("* */15 * * * *")]TimerInfo myTimer, ILogger log)
{
// Get the connection string from app settings and use it to create a connection.
var str = Environment.GetEnvironmentVariable("sqldb_connection");
using (SqlConnection conn = new SqlConnection(str))
{
conn.Open();
var text = "EXEC stream.GetLatestTableData;";
using (SqlCommand cmd = new SqlCommand(text, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows were updated");
}
}
}
\ No newline at end of file
/*
This procedure is called from the orchestrator (ADF). Gets the "latest" data for the table.
Steps:
--It checks that change tracking is "working" and we haven't lost any data.
--determines if a full pull is necessary
--gets the data required
--saves CHANGE_TRACKING_CURRENT_VERSION() to NextSyncVersion so this can be called transactionally AFTER the copy activity succeeds in ADF
-- see STREAM.SetLastSyncVersion
*/
IF NOT EXISTS (select * from sys.objects where object_id = object_id('STREAM.GetLatestTableData'))
BEGIN
EXEC('CREATE PROCEDURE STREAM.GetLatestTableData AS BEGIN SELECT NULL; END;');
END;
GO
ALTER PROCEDURE STREAM.GetLatestTableData
@schemaname varchar(256),
@tblname varchar(256)
AS
BEGIN
SET NOCOUNT ON;
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
BEGIN TRAN
DECLARE @LastSyncVersion BIGINT = -1, @EarliestVersion BIGINT = -1, @exec_str VARCHAR(8000), @token VARCHAR(8000), @CHANGE_TRACKING_CURRENT_VERSION BIGINT;
SELECT @token = @schemaname + '.' + @tblname;
--get "current"
SELECT @CHANGE_TRACKING_CURRENT_VERSION = CHANGE_TRACKING_CURRENT_VERSION();
--get LastSyncVersion
SELECT @LastSyncVersion = LastSyncVersion
FROM STREAM.CTTABLES
WHERE schemaname = @schemaname
AND tblname = @tblname
AND is_enabled = 1;
--SELECT @LastSyncVersion;
--is it "safe" to pull transactions from the LastSyncVersion
--the min_valid_version for the table must be > what we last pulled
--don't be concerned if this is a full pull (-1)
SELECT @EarliestVersion = ctt.min_valid_version
FROM sys.objects so
JOIN sys.change_tracking_tables ctt ON so.object_id = ctt.object_id
WHERE so.schema_id = schema_id(@schemaname)
AND so.object_id = object_id(@tblname)
IF ((@LastSyncVersion <> - 1) AND @EarliestVersion > @LastSyncVersion)
BEGIN
RAISERROR ('Last Sync is older than change tracking, need to do a full pull of table stream.',16,1);
RETURN 1;
END;
--full or incremental?
--Full will have a LastSyncVersion of -1 initially
IF @LastSyncVersion = -1
BEGIN
SELECT @exec_str = REPLACE(FullPullQuery,'%TABLE%',@token) FROM STREAM.CTTABLES WHERE schemaname = @schemaname AND tblname = @tblname;
PRINT @exec_str;
EXEC (@exec_str)
END;
ELSE
BEGIN
--incremental via change tracking
SELECT @exec_str = REPLACE(REPLACE(IncrementalQuery,'%TABLE%',@token),'%LastSyncVersion%',convert(varchar(100),LastSyncVersion)) FROM STREAM.CTTABLES WHERE schemaname = @schemaname AND tblname = @tblname;
--SELECT @exec_str = REPLACE(@exec_str,'base.*','TOP 100 base.*')
PRINT @exec_str;
EXEC (@exec_str)
END;
--update new HWM
UPDATE STREAM.CTTABLES
SET NextSyncVersion = @CHANGE_TRACKING_CURRENT_VERSION
WHERE schemaname = @schemaname
AND tblname = @tblname
AND is_enabled = 1;
COMMIT;
END;
GO
/*
Unit Tests
update STREAM.CTTABLES SET LastSyncVersion = -1 where tblname = 'KNVH';
EXEC STREAM.GetLatestTableData 'qas', 'ZREGION'
select * from STREAM.CTTABLES;
select CHANGE_TRACKING_CURRENT_VERSION();
EXEC STREAM.GetLatestTableData 'qas', 'ZGOALS'
select * from qas.ZREGION;
update qas.ZREGION SET ZDESC = ZDESC WHERE ZDESC = 'COLT MCLAUGHLIN'
SELECT CHANGE_TRACKING_CURRENT_VERSION();
select * from qas.MARA WHERE MTART = 'ZMAC' AND ERNAM = 'PWALL';
UPDATE qas.MARA SET PSTAT = PSTAT WHERE MTART = 'ZMAC' AND ERNAM = 'PWALL';
select top 10 * from qas.ZGOALS;
update qas.ZGOALS SET DOLLARS = DOLLARS WHERE DOLLARS = 93958.00
*/
/*
This procedure is called from the orchestrator (ADF).
It sets LastSyncVersion = NextSyncVersion and sets NextSyncVersion = -1
There is no way to pass an OUTPUT param from a stored proc and use it elsewhere as param in ADF.
STREAM.GetLatestTableData persists the "next" val and then we call this AFTER ADF actually
persists the data to the destination.
This helps the process to be "transactional".
We set NextSyncVersion to -1 to indicate that there is no ADF pipeline running for the given table.
*/
IF NOT EXISTS (select * from sys.objects where object_id = object_id('STREAM.SetLastSyncVersion'))
BEGIN
EXEC('CREATE PROCEDURE STREAM.SetLastSyncVersion AS BEGIN SELECT NULL; END;');
END;
GO
ALTER PROCEDURE STREAM.SetLastSyncVersion
@schemaname varchar(256),
@tblname varchar(256)
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRAN
UPDATE STREAM.CTTABLES
SET LastSyncVersion = NextSyncVersion,
NextSyncVersion = -1
WHERE schemaname = @schemaname
AND tblname = @tblname
AND is_enabled = 1;
COMMIT;
END;
GO
/*
STREAM.SetLastSyncVersion 'qas','ZREGION';
*/
## Philadelphia Azure DataFest: A Microsoft Advanced Analytics and Big Data Conference
December 7, 2018
### Building a Self-Service Data Lake: From Ingestion to Analytics - Dave Wentzel
1:15-2:15 PM
Abstract:
Everyone wants to do self-service analytics in the cloud. But what does that really mean? In this session I’ll show you how to design a data lake from data ingestion to analytics using Azure.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment