using Azure.Identity; using Azure.Security.KeyVault.Keys.Cryptography; using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Encryption; using Microsoft.Azure.CosmosDB.BulkExecutor; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Client; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Threading.Tasks; namespace ConsoleApp1 { static class Program { static readonly string accountEndpoint = "https://{}.documents.azure.com:443/"; static readonly string accountKey = "{}"; static async Task Main(string[] args) { await CosmosBulkWithV2DynamicDataWithIdInteger(); Console.WriteLine("**************************************************************************"); await CosmosBulkWithV2DynamicDataWithoutId(); Console.WriteLine("**************************************************************************"); await CosmosBulkWithV3DynamicDataWithoutId(); Console.WriteLine("**************************************************************************"); await CosmosBulkWithV3DynamicDataWithIdInteger(); Console.WriteLine("**************************************************************************"); await CosmosBulkWithV3DynamicDataWithIdString(); Console.WriteLine("**************************************************************************"); await CosmosEncryptionBulkDynamicData(); Console.WriteLine("**************************************************************************"); } /// /// Cosmos Bulk Execution V2 and dynamic data With ID as Integer /// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/bulk-executor-dot-net /// /// Success in inserting documents when the id is an integer value private static async Task CosmosBulkWithV2DynamicDataWithIdInteger() { string databaseName = "bulk-v2-dynamic"; string collectionName = "withidinteger"; string partitionKey = "/job_type"; var cosmosClient = new CosmosClient(accountEndpoint, accountKey, new CosmosClientOptions() { ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct }); var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); if (databaseCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Database: {databaseName}"); else { if (databaseCheck.StatusCode == HttpStatusCode.OK) Console.WriteLine($"Database: {databaseName} Exists"); } var databaseClient = cosmosClient.GetDatabase(databaseName); ContainerProperties containerProperties = new ContainerProperties { PartitionKeyPath = partitionKey, Id = collectionName, IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy() { IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None, Automatic = false } }; var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties); if (containerCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Container: {collectionName}"); else { if (containerCheck.StatusCode == HttpStatusCode.OK) { await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties); Console.WriteLine($"Container: {collectionName} Exists"); } } var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": 2743}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": 2742}]"; Console.WriteLine("Received Json String Data"); var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace); Console.WriteLine("Converted Json String Data"); var connectionPolicy = new ConnectionPolicy() { ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp, RetryOptions = new RetryOptions() { MaxRetryWaitTimeInSeconds = 30, MaxRetryAttemptsOnThrottledRequests = 9 } }; using var documentClient = new DocumentClient(new Uri(accountEndpoint), accountKey, connectionPolicy); ResourceResponse collectionCheck; collectionCheck = await documentClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(databaseName, collectionName)); IBulkExecutor bulkExecutor = new BulkExecutor(documentClient, collectionCheck); await bulkExecutor.InitializeAsync(); Console.WriteLine($"Total Documents to be imported: {binaryDownloadData.Count}"); while (binaryDownloadData.Any()) { var batchDocuments = binaryDownloadData.Take(1000); var bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: batchDocuments, enableUpsert: true); if (bulkImportResponse.NumberOfDocumentsImported > 0) Console.WriteLine($"Successfully Imported {bulkImportResponse.NumberOfDocumentsImported} Documents. " + $"Total Request Units Consumed: {bulkImportResponse.TotalRequestUnitsConsumed} " + $"Total Time Taken: {bulkImportResponse.TotalTimeTaken} "); if (bulkImportResponse.BadInputDocuments.Count > 0) { var failedImports = string.Join(" , ", bulkImportResponse.FailedImports); Console.WriteLine($"Failed To Import: {failedImports}"); Console.WriteLine($"Count of Bad Input Documents: {bulkImportResponse.BadInputDocuments}"); } binaryDownloadData = binaryDownloadData.Skip(1000).ToList(); } } /// /// Cosmos Bulk Execution V2 and dynamic data without specifying ID /// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/bulk-executor-dot-net /// /// Success in inserting documents when id is not present and id with guid values are generated private static async Task CosmosBulkWithV2DynamicDataWithoutId() { string databaseName = "bulk-v2-dynamic"; string collectionName = "withoutid"; string partitionKey = "/job_type"; var cosmosClient = new CosmosClient(accountEndpoint, accountKey, new CosmosClientOptions() { ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct }); var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); if (databaseCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Database: {databaseName}"); else { if (databaseCheck.StatusCode == HttpStatusCode.OK) Console.WriteLine($"Database: {databaseName} Exists"); } var databaseClient = cosmosClient.GetDatabase(databaseName); ContainerProperties containerProperties = new ContainerProperties { PartitionKeyPath = partitionKey, Id = collectionName, IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy() { IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None, Automatic = false } }; var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties); if (containerCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Container: {collectionName}"); else { if (containerCheck.StatusCode == HttpStatusCode.OK) { await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties); Console.WriteLine($"Container: {collectionName} Exists"); } } var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\"}]"; Console.WriteLine("Received Json String Data"); var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace); Console.WriteLine("Converted Json String Data"); var connectionPolicy = new ConnectionPolicy() { ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp, RetryOptions = new RetryOptions() { MaxRetryWaitTimeInSeconds = 30, MaxRetryAttemptsOnThrottledRequests = 9 } }; using var documentClient = new DocumentClient(new Uri(accountEndpoint), accountKey, connectionPolicy); ResourceResponse collectionCheck; collectionCheck = await documentClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(databaseName, collectionName)); IBulkExecutor bulkExecutor = new BulkExecutor(documentClient, collectionCheck); await bulkExecutor.InitializeAsync(); Console.WriteLine($"Total Documents to be imported: {binaryDownloadData.Count}"); while (binaryDownloadData.Any()) { var batchDocuments = binaryDownloadData.Take(1000); var bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: batchDocuments, enableUpsert: true); if (bulkImportResponse.NumberOfDocumentsImported > 0) Console.WriteLine($"Successfully Imported {bulkImportResponse.NumberOfDocumentsImported} Documents. " + $"Total Request Units Consumed: {bulkImportResponse.TotalRequestUnitsConsumed} " + $"Total Time Taken: {bulkImportResponse.TotalTimeTaken} "); if (bulkImportResponse.BadInputDocuments.Count > 0) { var failedImports = string.Join(" , ", bulkImportResponse.FailedImports); Console.WriteLine($"Failed To Import: {failedImports}"); Console.WriteLine($"Count of Bad Input Documents: {bulkImportResponse.BadInputDocuments}"); } binaryDownloadData = binaryDownloadData.Skip(1000).ToList(); } } /// /// Cosmos Bulk Exceution V3 and dynamic data without specifying ID /// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/how-to-migrate-from-bulk-executor-library /// /// Failed to Insert documents /// Successfully Created Database: bulk-v3-dynamic /// Successfully Created Container: withoutid /// Received Json String Data /// Converted Json String Data /// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: a3bd82d5-0476-4e6c-9913-3fa57af54d8d; Reason: ();). /// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: a3bd82d5-0476-4e6c-9913-3fa57af54d8d; Reason: ();). /// private static async Task CosmosBulkWithV3DynamicDataWithoutId() { string databaseName = "bulk-v3-dynamic"; string collectionName = "withoutid"; string partitionKey = "/job_type"; var cosmosClient = new CosmosClient(accountEndpoint, accountKey, new CosmosClientOptions() { ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct, AllowBulkExecution = true }); var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); if (databaseCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Database: {databaseName}"); else { if (databaseCheck.StatusCode == HttpStatusCode.OK) Console.WriteLine($"Database: {databaseName} Exists"); } var databaseClient = cosmosClient.GetDatabase(databaseName); ContainerProperties containerProperties = new ContainerProperties { PartitionKeyPath = partitionKey, Id = collectionName, IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy() { IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None, Automatic = false } }; var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties); if (containerCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Container: {collectionName}"); else { if (containerCheck.StatusCode == HttpStatusCode.OK) { await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties); Console.WriteLine($"Container: {collectionName} Exists"); } } var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\"}]"; Console.WriteLine("Received Json String Data"); var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace); Console.WriteLine("Converted Json String Data"); Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName); List tasks = new List(binaryDownloadData.Count); foreach (var item in binaryDownloadData) { tasks.Add(container.CreateItemAsync(item) .ContinueWith(itemResponse => { if (!itemResponse.IsCompletedSuccessfully) { AggregateException innerExceptions = itemResponse.Exception.Flatten(); if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException) { Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message})."); } else { Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}."); } } })); } // Wait until all are done await Task.WhenAll(tasks); } /// /// Cosmos Bulk Exceution V3 and dynamic data with ID as integer /// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/how-to-migrate-from-bulk-executor-library /// /// Failed to insert documents /// Database: bulk-v3-dynamic Exists /// Successfully Created Container: withidinteger /// Received Json String Data /// Converted Json String Data /// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: cbe1988d-b7ed-4e42-a60b-b92e4f30f730; Reason: ();). /// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: cbe1988d-b7ed-4e42-a60b-b92e4f30f730; Reason: ();). /// private static async Task CosmosBulkWithV3DynamicDataWithIdInteger() { string databaseName = "bulk-v3-dynamic"; string collectionName = "withidinteger"; string partitionKey = "/job_type"; var cosmosClient = new CosmosClient(accountEndpoint, accountKey, new CosmosClientOptions() { ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct, AllowBulkExecution = true }); var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); if (databaseCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Database: {databaseName}"); else { if (databaseCheck.StatusCode == HttpStatusCode.OK) Console.WriteLine($"Database: {databaseName} Exists"); } var databaseClient = cosmosClient.GetDatabase(databaseName); ContainerProperties containerProperties = new ContainerProperties { PartitionKeyPath = partitionKey, Id = collectionName, IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy() { IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None, Automatic = false } }; var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties); if (containerCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Container: {collectionName}"); else { if (containerCheck.StatusCode == HttpStatusCode.OK) { await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties); Console.WriteLine($"Container: {collectionName} Exists"); } } var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": 2743}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": 2742}]"; Console.WriteLine("Received Json String Data"); var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace); Console.WriteLine("Converted Json String Data"); Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName); List tasks = new List(binaryDownloadData.Count); foreach (var item in binaryDownloadData) { tasks.Add(container.CreateItemAsync(item) .ContinueWith(itemResponse => { if (!itemResponse.IsCompletedSuccessfully) { AggregateException innerExceptions = itemResponse.Exception.Flatten(); if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException) { Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message})."); } else { Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}."); } } })); } // Wait until all are done await Task.WhenAll(tasks); } /// /// Cosmos Bulk Exceution V3 and dynamic data with ID as string /// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/how-to-migrate-from-bulk-executor-library /// /// Successfully inserted documents /// Database: bulk-v3-dynamic Exists /// Successfully Created Container: withidstring /// Received Json String Data /// Converted Json String Data /// private static async Task CosmosBulkWithV3DynamicDataWithIdString() { string databaseName = "bulk-v3-dynamic"; string collectionName = "withidstring"; string partitionKey = "/job_type"; var cosmosClient = new CosmosClient(accountEndpoint, accountKey, new CosmosClientOptions() { ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct, AllowBulkExecution = true }); var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); if (databaseCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Database: {databaseName}"); else { if (databaseCheck.StatusCode == HttpStatusCode.OK) Console.WriteLine($"Database: {databaseName} Exists"); } var databaseClient = cosmosClient.GetDatabase(databaseName); ContainerProperties containerProperties = new ContainerProperties { PartitionKeyPath = partitionKey, Id = collectionName, IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy() { IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None, Automatic = false } }; var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties); if (containerCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Container: {collectionName}"); else { if (containerCheck.StatusCode == HttpStatusCode.OK) { await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties); Console.WriteLine($"Container: {collectionName} Exists"); } } var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": \"2743\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": \"2742\"}]"; Console.WriteLine("Received Json String Data"); var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace); Console.WriteLine("Converted Json String Data"); Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName); List tasks = new List(binaryDownloadData.Count); foreach (var item in binaryDownloadData) { tasks.Add(container.CreateItemAsync(item) .ContinueWith(itemResponse => { if (!itemResponse.IsCompletedSuccessfully) { AggregateException innerExceptions = itemResponse.Exception.Flatten(); if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException) { Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message})."); } else { Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}."); } } else { Console.WriteLine("Successfully inserted the document"); } })); } // Wait until all are done await Task.WhenAll(tasks); } /// /// Cosmos Bulk execution with Encryption enabled and dynamic data with ID as string /// https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-always-encrypted?tabs=dotnet /// /// Failed to insert documents /// Successfully Created Database: bulk-v3-dynamic /// Successfully Created Container: encryptionwithidstring /// Received Json String Data /// Converted Json String Data /// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 1001; ActivityId: 582cf6e9-1a59-4968-b6cc-9ce681ed379e; Reason: ();). /// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 1001; ActivityId: 582cf6e9-1a59-4968-b6cc-9ce681ed379e; Reason: ();). /// private static async Task CosmosEncryptionBulkDynamicData() { string databaseName = "bulk-v3-dynamic"; string collectionName = "encryptionwithidstring"; string partitionKey = "/job_type"; var fieldEncryptItem = "source_connector_name"; string keyVaultIdentifier = "{}"; var keyName = "{}"; var tokenCredential = new DefaultAzureCredential(); var keyResolver = new KeyResolver(tokenCredential); var cosmosClient = new CosmosClient(accountEndpoint, accountKey, new CosmosClientOptions() { ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct, AllowBulkExecution = true }).WithEncryption(keyResolver, KeyEncryptionKeyResolverName.AzureKeyVault); var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); if (databaseCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Database: {databaseName}"); else { if (databaseCheck.StatusCode == HttpStatusCode.OK) Console.WriteLine($"Database: {databaseName} Exists"); } var databaseClient = cosmosClient.GetDatabase(databaseName); await databaseClient.CreateClientEncryptionKeyAsync( keyName, DataEncryptionAlgorithm.AeadAes256CbcHmacSha256, new EncryptionKeyWrapMetadata( KeyEncryptionKeyResolverName.AzureKeyVault, keyName, keyVaultIdentifier, EncryptionAlgorithm.RsaOaep.ToString())); var fieldEncryptionPathList = new List { new ClientEncryptionIncludedPath() { Path = $"/{fieldEncryptItem}", ClientEncryptionKeyId = keyName, EncryptionType = EncryptionType.Deterministic.ToString(), EncryptionAlgorithm = DataEncryptionAlgorithm.AeadAes256CbcHmacSha256 } }; ContainerProperties containerProperties = new ContainerProperties { PartitionKeyPath = partitionKey, Id = collectionName, ClientEncryptionPolicy = new ClientEncryptionPolicy(fieldEncryptionPathList), IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy() { IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None, Automatic = false } }; var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties); if (containerCheck.StatusCode == HttpStatusCode.Created) Console.WriteLine($"Successfully Created Container: {collectionName}"); else { if (containerCheck.StatusCode == HttpStatusCode.OK) { await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties); Console.WriteLine($"Container: {collectionName} Exists"); } } var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": \"2743\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": \"2742\"}]"; Console.WriteLine("Received Json String Data"); var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace); Console.WriteLine("Converted Json String Data"); Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName); List tasks = new List(binaryDownloadData.Count); foreach (var item in binaryDownloadData) { tasks.Add(container.CreateItemAsync(item, new Microsoft.Azure.Cosmos.PartitionKey(partitionKey)) .ContinueWith(itemResponse => { if (!itemResponse.IsCompletedSuccessfully) { AggregateException innerExceptions = itemResponse.Exception.Flatten(); if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException) { Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message})."); } else { Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}."); } } })); } // Wait until all are done await Task.WhenAll(tasks); } } }