source(output( createdtime as string, days as (duration as integer, temperature as double, year as integer, waterusage as double, coldwaterusage as double, day as integer, hotwaterusage as integer)[], deviceid as string ), allowSchemaDrift: true, validateSchema: false, format: 'document', systemColumns: true) ~> source1 source1 foldDown(unroll(days, days), mapColumn( coldwaterusage = days.coldwaterusage, usageday = days.day, duration = days.duration, hotwaterusage = days.hotwaterusage, temperature = days.temperature, waterusage = days.waterusage, usageyear = days.year, deviceid, createdtime ), skipDuplicateMapInputs: false, skipDuplicateMapOutputs: false) ~> Flatten1 Select1 derive(usage = @(quantity=waterusage, waterusage=waterusage, hotwaterusage=hotwaterusage, coldwaterusage=coldwaterusage, usageduration=duration, temperature=temperature), deviceusagedate = toString(addDays(toDate((toString(usageyear)+'-'+'01'+'-'+'01')),usageday-1)), deviceusagemonth = left(toString(addDays(toDate((toString(usageyear)+'-'+'01'+'-'+'01')),usageday-1)),7), deviceusageid = deviceid+'-'+(toString(addDays(toDate((toString(usageyear)+'-'+'01'+'-'+'01')),usageday-1))), newid = deviceid+'-'+(toString(addDays(toDate((toString(usageyear)+'-'+'01'+'-'+'01')),usageday-1)))+'-'+'11', ismigrated = 'Yes', applicationsource = 'KohlerKonnect') ~> DerivedColumn1 Filter1 select(mapColumn( coldwaterusage, usageday, duration, hotwaterusage, temperature, waterusage, usageyear, deviceid, createdtime ), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> Select1 Select5 alterRow(upsertIf(true())) ~> AlterRow1 DerivedColumn1 select(mapColumn( deviceid, createdtime, usage, deviceusagedate, deviceusagemonth, deviceusageid, id = newid, ismigrated, applicationsource ), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> Select5 Flatten1 filter(deviceid=='sen-sio32243nz') ~> Filter1 AlterRow1 sink(allowSchemaDrift: true, validateSchema: false, deletable:false, insertable:false, updateable:false, upsertable:true, format: 'document', skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> sink1