parameters{ FILE_NAME as string, PR_COL_ID_TABLE as string, PR_END_TIME as string ('EndTime'), PR_START_TIME as string ('StartTime'), PR_COL_FILE_NAME as string ('FileName') } source(allowSchemaDrift: true, validateSchema: false, rowUrlColumn: 'FileName', partitionBy('hash', 1)) ~> DataToProcess source(allowSchemaDrift: true, validateSchema: false, format: 'delta', fileSystem: 'staged-data-zone', folderPath: 'dbo_ML_Ort_DELTA') ~> ExistingData DataToProcess derive(StartTime = toDate( (concat( substring(FileName, 14,4),'-' ,substring(FileName, 18,2),'-' ,substring(FileName, 20,2) ) ) ), EndTime = toDate("1900-01-01"), JoinNewDataColumn = toString(byName($PR_COL_ID_TABLE)), HashId = toString(byName($PR_COL_ID_TABLE)), HashIdToDelete = upper(left($PR_COL_ID_TABLE ,2)) , partitionBy('hash', 1)) ~> Sdc2StartTime NewDataColumns, DynamicJoinExistingDataID join(JoinNewDataColumn == JoinExistingDataColumn, joinType:'right', partitionBy('hash', 1), broadcast: 'auto')~> FlagNewData IsFileAlreadyStanged select(mapColumn( StartTimeNewData = StartTime, EndTimeNewData = EndTime, JoinNewDataColumn ), partitionBy('hash', 1), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> NewDataColumns FlagNewData derive(EndTime = iif( and( year(toDate(byName($PR_END_TIME))) == 1900 ,StartTimeNewData > toDate(byName($PR_START_TIME)) ) ,toString(StartTimeNewData) ,toString(byName($PR_END_TIME)) ), partitionBy('hash', 1)) ~> EndTimeSetup EndTimeSetup select(mapColumn( each(match(!(endsWith(name,"NewData")||startsWith(name,"Join)")))) ), partitionBy('hash', 1), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> DropNewColumns ToStringOldData, ToStringNewData union(byName: true, partitionBy('hash', 1))~> FinalData IsFileAlreadyStanged derive(each(match(true()), $$ = toString($$)), partitionBy('hash', 1)) ~> ToStringNewData DropNewColumns derive(each(match(true()), $$ = toString($$)), partitionBy('hash', 1)) ~> ToStringOldData ExistingData filter(year(toDate(byName($PR_END_TIME))) == 1900) ~> Filter1 FinalData filter(upper(left($PR_COL_ID_TABLE ,2)) != "D-", partitionBy('hash', 1)) ~> Filter2 Filter1 derive(JoinExistingDataColumn = toString(byName($PR_COL_ID_TABLE)), JoinExistingFileNameColumn = toString(byName($PR_COL_FILE_NAME))) ~> DynamicJoinExistingDataID Sdc2StartTime, DynamicJoinExistingDataID exists(FileName == JoinExistingFileNameColumn, negate:true, broadcast: 'auto')~> IsFileAlreadyStanged Filter2 alterRow(updateIf(year(toDate(EndTime))!=1900), deleteIf(HashIdToDelete == "D-")) ~> AlterRow1 ToStringNewData filter(upper(left($PR_COL_ID_TABLE ,2)) != "D-") ~> Filter3 AlterRow1 sink(allowSchemaDrift: true, validateSchema: false, input( Pro2SrcPDB as string, adressnr as string, aenderungbenutzer as string, aenderungdatum as string, aenderungzeit as string, anlagebenutzer as string, anlagedatum as string, anlagezeit as string, archived as string, buchungssperre as string, disponibel as string, firma as string, gesperrt as string, konto as string, kundennummer as string, lagerbereich as string, lagergruppe as string, lagerort as string, ml_ort_obj as string, mlm_stockvaluegroup_obj as string, mpstrategie as string, pro2created as string, pro2modified as string, prrowid as string, storagetype as string, suplogcenter as string, uci_costcenter as string, uci_costincobject as string, uci_werklieferantedi as string, uci_wms_id as string, unloadingpoint as string, xunloadingpoint as string, FileName as string, StartTime as date, EndTime as date, HashId as string ), format: 'delta', fileSystem: 'staged-data-zone', folderPath: 'dbo_ML_Ort_DELTA', truncate: false, vacuum: 0, deletable:false, insertable:false, updateable:true, upsertable:false, keys:['HashId','StartTime'], mapColumn( FileName, HashId, StartTime, EndTime ), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> sink1 Filter3 sink(allowSchemaDrift: true, validateSchema: false, input( Pro2SrcPDB as string, adressnr as string, aenderungbenutzer as string, aenderungdatum as string, aenderungzeit as string, anlagebenutzer as string, anlagedatum as string, anlagezeit as string, archived as string, buchungssperre as string, disponibel as string, firma as string, gesperrt as string, konto as string, kundennummer as string, lagerbereich as string, lagergruppe as string, lagerort as string, ml_ort_obj as string, mlm_stockvaluegroup_obj as string, mpstrategie as string, pro2created as string, pro2modified as string, prrowid as string, storagetype as string, suplogcenter as string, uci_costcenter as string, uci_costincobject as string, uci_werklieferantedi as string, uci_wms_id as string, unloadingpoint as string, xunloadingpoint as string, FileName as string, StartTime as date, EndTime as date ), format: 'delta', fileSystem: 'staged-data-zone', folderPath: 'dbo_ML_Ort_DELTA', truncate: false, vacuum: 0, deletable:false, insertable:true, updateable:false, upsertable:false, skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> sink2 AlterRow1 sink(allowSchemaDrift: true, validateSchema: false, input( Pro2SrcPDB as string, adressnr as string, aenderungbenutzer as string, aenderungdatum as string, aenderungzeit as string, anlagebenutzer as string, anlagedatum as string, anlagezeit as string, archived as string, buchungssperre as string, disponibel as string, firma as string, gesperrt as string, konto as string, kundennummer as string, lagerbereich as string, lagergruppe as string, lagerort as string, ml_ort_obj as string, mlm_stockvaluegroup_obj as string, mpstrategie as string, pro2created as string, pro2modified as string, prrowid as string, storagetype as string, suplogcenter as string, uci_costcenter as string, uci_costincobject as string, uci_werklieferantedi as string, uci_wms_id as string, unloadingpoint as string, xunloadingpoint as string, FileName as string, StartTime as date, EndTime as date, HashId as string ), format: 'delta', fileSystem: 'staged-data-zone', folderPath: 'dbo_ML_Ort_DELTA', truncate: false, vacuum: 0, deletable:true, insertable:false, updateable:false, upsertable:false, keys:['HashId'], mapColumn( FileName, HashId ), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> sink3