source(output( leadId as short, activityDate as timestamp, activityTypeId as short, campaignId as integer, primaryAttributeValue as string, attributes as string ), allowSchemaDrift: false, validateSchema: false) ~> source1 DerivedColumn2 foldDown(unroll(attributes, attributes), mapColumn( leadId, attributes ), skipDuplicateMapInputs: false, skipDuplicateMapOutputs: false) ~> Flatten1 RemoveColumns1 derive(attributes = rtrim(attributes,'}')) ~> RemoveBrace source1 select(mapColumn( leadId, attributes ), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> RemoveColumns1 RemoveBrace derive(attributes = regexSplit(attributes,',"')) ~> DerivedColumn2 Flatten1 derive(attribute_name = ltrim(regexSplit(attributes, '":"?')[1],'"'), attribute_value = rtrim(regexSplit(attributes, '":"?')[2],'"')) ~> DerivedColumn3 DerivedColumn3 sink(allowSchemaDrift: true, validateSchema: false, mapColumn( leadId, attribute_name, attribute_value ), skipDuplicateMapInputs: true, skipDuplicateMapOutputs: true) ~> sink1