using Azure.Identity;
using Azure.Security.KeyVault.Keys.Cryptography;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Encryption;
using Microsoft.Azure.CosmosDB.BulkExecutor;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
namespace ConsoleApp1
{
static class Program
{
static readonly string accountEndpoint = "https://{}.documents.azure.com:443/";
static readonly string accountKey = "{}";
static async Task Main(string[] args)
{
await CosmosBulkWithV2DynamicDataWithIdInteger();
Console.WriteLine("**************************************************************************");
await CosmosBulkWithV2DynamicDataWithoutId();
Console.WriteLine("**************************************************************************");
await CosmosBulkWithV3DynamicDataWithoutId();
Console.WriteLine("**************************************************************************");
await CosmosBulkWithV3DynamicDataWithIdInteger();
Console.WriteLine("**************************************************************************");
await CosmosBulkWithV3DynamicDataWithIdString();
Console.WriteLine("**************************************************************************");
await CosmosEncryptionBulkDynamicData();
Console.WriteLine("**************************************************************************");
}
///
/// Cosmos Bulk Execution V2 and dynamic data With ID as Integer
/// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/bulk-executor-dot-net
///
/// Success in inserting documents when the id is an integer value
private static async Task CosmosBulkWithV2DynamicDataWithIdInteger()
{
string databaseName = "bulk-v2-dynamic";
string collectionName = "withidinteger";
string partitionKey = "/job_type";
var cosmosClient = new CosmosClient(accountEndpoint, accountKey,
new CosmosClientOptions()
{
ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct
});
var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
if (databaseCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Database: {databaseName}");
else
{
if (databaseCheck.StatusCode == HttpStatusCode.OK)
Console.WriteLine($"Database: {databaseName} Exists");
}
var databaseClient = cosmosClient.GetDatabase(databaseName);
ContainerProperties containerProperties = new ContainerProperties
{
PartitionKeyPath = partitionKey,
Id = collectionName,
IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy()
{
IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None,
Automatic = false
}
};
var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties);
if (containerCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Container: {collectionName}");
else
{
if (containerCheck.StatusCode == HttpStatusCode.OK)
{
await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties);
Console.WriteLine($"Container: {collectionName} Exists");
}
}
var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": 2743}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": 2742}]";
Console.WriteLine("Received Json String Data");
var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace);
Console.WriteLine("Converted Json String Data");
var connectionPolicy = new ConnectionPolicy()
{
ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp,
RetryOptions = new RetryOptions()
{
MaxRetryWaitTimeInSeconds = 30,
MaxRetryAttemptsOnThrottledRequests = 9
}
};
using var documentClient = new DocumentClient(new Uri(accountEndpoint),
accountKey, connectionPolicy);
ResourceResponse collectionCheck;
collectionCheck = await documentClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(databaseName, collectionName));
IBulkExecutor bulkExecutor = new BulkExecutor(documentClient, collectionCheck);
await bulkExecutor.InitializeAsync();
Console.WriteLine($"Total Documents to be imported: {binaryDownloadData.Count}");
while (binaryDownloadData.Any())
{
var batchDocuments = binaryDownloadData.Take(1000);
var bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: batchDocuments,
enableUpsert: true);
if (bulkImportResponse.NumberOfDocumentsImported > 0)
Console.WriteLine($"Successfully Imported {bulkImportResponse.NumberOfDocumentsImported} Documents. " +
$"Total Request Units Consumed: {bulkImportResponse.TotalRequestUnitsConsumed} " +
$"Total Time Taken: {bulkImportResponse.TotalTimeTaken} ");
if (bulkImportResponse.BadInputDocuments.Count > 0)
{
var failedImports = string.Join(" , ", bulkImportResponse.FailedImports);
Console.WriteLine($"Failed To Import: {failedImports}");
Console.WriteLine($"Count of Bad Input Documents: {bulkImportResponse.BadInputDocuments}");
}
binaryDownloadData = binaryDownloadData.Skip(1000).ToList();
}
}
///
/// Cosmos Bulk Execution V2 and dynamic data without specifying ID
/// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/bulk-executor-dot-net
///
/// Success in inserting documents when id is not present and id with guid values are generated
private static async Task CosmosBulkWithV2DynamicDataWithoutId()
{
string databaseName = "bulk-v2-dynamic";
string collectionName = "withoutid";
string partitionKey = "/job_type";
var cosmosClient = new CosmosClient(accountEndpoint, accountKey,
new CosmosClientOptions()
{
ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct
});
var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
if (databaseCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Database: {databaseName}");
else
{
if (databaseCheck.StatusCode == HttpStatusCode.OK)
Console.WriteLine($"Database: {databaseName} Exists");
}
var databaseClient = cosmosClient.GetDatabase(databaseName);
ContainerProperties containerProperties = new ContainerProperties
{
PartitionKeyPath = partitionKey,
Id = collectionName,
IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy()
{
IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None,
Automatic = false
}
};
var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties);
if (containerCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Container: {collectionName}");
else
{
if (containerCheck.StatusCode == HttpStatusCode.OK)
{
await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties);
Console.WriteLine($"Container: {collectionName} Exists");
}
}
var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\"}]";
Console.WriteLine("Received Json String Data");
var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace);
Console.WriteLine("Converted Json String Data");
var connectionPolicy = new ConnectionPolicy()
{
ConnectionMode = Microsoft.Azure.Documents.Client.ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp,
RetryOptions = new RetryOptions()
{
MaxRetryWaitTimeInSeconds = 30,
MaxRetryAttemptsOnThrottledRequests = 9
}
};
using var documentClient = new DocumentClient(new Uri(accountEndpoint),
accountKey, connectionPolicy);
ResourceResponse collectionCheck;
collectionCheck = await documentClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(databaseName, collectionName));
IBulkExecutor bulkExecutor = new BulkExecutor(documentClient, collectionCheck);
await bulkExecutor.InitializeAsync();
Console.WriteLine($"Total Documents to be imported: {binaryDownloadData.Count}");
while (binaryDownloadData.Any())
{
var batchDocuments = binaryDownloadData.Take(1000);
var bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: batchDocuments,
enableUpsert: true);
if (bulkImportResponse.NumberOfDocumentsImported > 0)
Console.WriteLine($"Successfully Imported {bulkImportResponse.NumberOfDocumentsImported} Documents. " +
$"Total Request Units Consumed: {bulkImportResponse.TotalRequestUnitsConsumed} " +
$"Total Time Taken: {bulkImportResponse.TotalTimeTaken} ");
if (bulkImportResponse.BadInputDocuments.Count > 0)
{
var failedImports = string.Join(" , ", bulkImportResponse.FailedImports);
Console.WriteLine($"Failed To Import: {failedImports}");
Console.WriteLine($"Count of Bad Input Documents: {bulkImportResponse.BadInputDocuments}");
}
binaryDownloadData = binaryDownloadData.Skip(1000).ToList();
}
}
///
/// Cosmos Bulk Exceution V3 and dynamic data without specifying ID
/// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/how-to-migrate-from-bulk-executor-library
///
/// Failed to Insert documents
/// Successfully Created Database: bulk-v3-dynamic
/// Successfully Created Container: withoutid
/// Received Json String Data
/// Converted Json String Data
/// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: a3bd82d5-0476-4e6c-9913-3fa57af54d8d; Reason: ();).
/// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: a3bd82d5-0476-4e6c-9913-3fa57af54d8d; Reason: ();).
///
private static async Task CosmosBulkWithV3DynamicDataWithoutId()
{
string databaseName = "bulk-v3-dynamic";
string collectionName = "withoutid";
string partitionKey = "/job_type";
var cosmosClient = new CosmosClient(accountEndpoint, accountKey,
new CosmosClientOptions()
{
ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct,
AllowBulkExecution = true
});
var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
if (databaseCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Database: {databaseName}");
else
{
if (databaseCheck.StatusCode == HttpStatusCode.OK)
Console.WriteLine($"Database: {databaseName} Exists");
}
var databaseClient = cosmosClient.GetDatabase(databaseName);
ContainerProperties containerProperties = new ContainerProperties
{
PartitionKeyPath = partitionKey,
Id = collectionName,
IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy()
{
IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None,
Automatic = false
}
};
var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties);
if (containerCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Container: {collectionName}");
else
{
if (containerCheck.StatusCode == HttpStatusCode.OK)
{
await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties);
Console.WriteLine($"Container: {collectionName} Exists");
}
}
var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\"}]";
Console.WriteLine("Received Json String Data");
var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace);
Console.WriteLine("Converted Json String Data");
Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName);
List tasks = new List(binaryDownloadData.Count);
foreach (var item in binaryDownloadData)
{
tasks.Add(container.CreateItemAsync(item)
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
}
///
/// Cosmos Bulk Exceution V3 and dynamic data with ID as integer
/// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/how-to-migrate-from-bulk-executor-library
///
/// Failed to insert documents
/// Database: bulk-v3-dynamic Exists
/// Successfully Created Container: withidinteger
/// Received Json String Data
/// Converted Json String Data
/// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: cbe1988d-b7ed-4e42-a60b-b92e4f30f730; Reason: ();).
/// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 0; ActivityId: cbe1988d-b7ed-4e42-a60b-b92e4f30f730; Reason: ();).
///
private static async Task CosmosBulkWithV3DynamicDataWithIdInteger()
{
string databaseName = "bulk-v3-dynamic";
string collectionName = "withidinteger";
string partitionKey = "/job_type";
var cosmosClient = new CosmosClient(accountEndpoint, accountKey,
new CosmosClientOptions()
{
ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct,
AllowBulkExecution = true
});
var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
if (databaseCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Database: {databaseName}");
else
{
if (databaseCheck.StatusCode == HttpStatusCode.OK)
Console.WriteLine($"Database: {databaseName} Exists");
}
var databaseClient = cosmosClient.GetDatabase(databaseName);
ContainerProperties containerProperties = new ContainerProperties
{
PartitionKeyPath = partitionKey,
Id = collectionName,
IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy()
{
IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None,
Automatic = false
}
};
var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties);
if (containerCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Container: {collectionName}");
else
{
if (containerCheck.StatusCode == HttpStatusCode.OK)
{
await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties);
Console.WriteLine($"Container: {collectionName} Exists");
}
}
var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": 2743}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": 2742}]";
Console.WriteLine("Received Json String Data");
var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace);
Console.WriteLine("Converted Json String Data");
Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName);
List tasks = new List(binaryDownloadData.Count);
foreach (var item in binaryDownloadData)
{
tasks.Add(container.CreateItemAsync(item)
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
}
///
/// Cosmos Bulk Exceution V3 and dynamic data with ID as string
/// https://docs.microsoft.com/en-us/azure/cosmos-db/sql/how-to-migrate-from-bulk-executor-library
///
/// Successfully inserted documents
/// Database: bulk-v3-dynamic Exists
/// Successfully Created Container: withidstring
/// Received Json String Data
/// Converted Json String Data
///
private static async Task CosmosBulkWithV3DynamicDataWithIdString()
{
string databaseName = "bulk-v3-dynamic";
string collectionName = "withidstring";
string partitionKey = "/job_type";
var cosmosClient = new CosmosClient(accountEndpoint, accountKey,
new CosmosClientOptions()
{
ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct,
AllowBulkExecution = true
});
var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
if (databaseCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Database: {databaseName}");
else
{
if (databaseCheck.StatusCode == HttpStatusCode.OK)
Console.WriteLine($"Database: {databaseName} Exists");
}
var databaseClient = cosmosClient.GetDatabase(databaseName);
ContainerProperties containerProperties = new ContainerProperties
{
PartitionKeyPath = partitionKey,
Id = collectionName,
IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy()
{
IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None,
Automatic = false
}
};
var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties);
if (containerCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Container: {collectionName}");
else
{
if (containerCheck.StatusCode == HttpStatusCode.OK)
{
await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties);
Console.WriteLine($"Container: {collectionName} Exists");
}
}
var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": \"2743\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": \"2742\"}]";
Console.WriteLine("Received Json String Data");
var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace);
Console.WriteLine("Converted Json String Data");
Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName);
List tasks = new List(binaryDownloadData.Count);
foreach (var item in binaryDownloadData)
{
tasks.Add(container.CreateItemAsync(item)
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
else
{
Console.WriteLine("Successfully inserted the document");
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
}
///
/// Cosmos Bulk execution with Encryption enabled and dynamic data with ID as string
/// https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-always-encrypted?tabs=dotnet
///
/// Failed to insert documents
/// Successfully Created Database: bulk-v3-dynamic
/// Successfully Created Container: encryptionwithidstring
/// Received Json String Data
/// Converted Json String Data
/// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 1001; ActivityId: 582cf6e9-1a59-4968-b6cc-9ce681ed379e; Reason: ();).
/// Received BadRequest(Response status code does not indicate success: BadRequest (400); Substatus: 1001; ActivityId: 582cf6e9-1a59-4968-b6cc-9ce681ed379e; Reason: ();).
///
private static async Task CosmosEncryptionBulkDynamicData()
{
string databaseName = "bulk-v3-dynamic";
string collectionName = "encryptionwithidstring";
string partitionKey = "/job_type";
var fieldEncryptItem = "source_connector_name";
string keyVaultIdentifier = "{}";
var keyName = "{}";
var tokenCredential = new DefaultAzureCredential();
var keyResolver = new KeyResolver(tokenCredential);
var cosmosClient = new CosmosClient(accountEndpoint, accountKey, new CosmosClientOptions()
{
ConnectionMode = Microsoft.Azure.Cosmos.ConnectionMode.Direct,
AllowBulkExecution = true
}).WithEncryption(keyResolver, KeyEncryptionKeyResolverName.AzureKeyVault);
var databaseCheck = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
if (databaseCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Database: {databaseName}");
else
{
if (databaseCheck.StatusCode == HttpStatusCode.OK)
Console.WriteLine($"Database: {databaseName} Exists");
}
var databaseClient = cosmosClient.GetDatabase(databaseName);
await databaseClient.CreateClientEncryptionKeyAsync(
keyName,
DataEncryptionAlgorithm.AeadAes256CbcHmacSha256,
new EncryptionKeyWrapMetadata(
KeyEncryptionKeyResolverName.AzureKeyVault,
keyName,
keyVaultIdentifier,
EncryptionAlgorithm.RsaOaep.ToString()));
var fieldEncryptionPathList = new List
{
new ClientEncryptionIncludedPath()
{
Path = $"/{fieldEncryptItem}",
ClientEncryptionKeyId = keyName,
EncryptionType = EncryptionType.Deterministic.ToString(),
EncryptionAlgorithm = DataEncryptionAlgorithm.AeadAes256CbcHmacSha256
}
};
ContainerProperties containerProperties = new ContainerProperties
{
PartitionKeyPath = partitionKey,
Id = collectionName,
ClientEncryptionPolicy = new ClientEncryptionPolicy(fieldEncryptionPathList),
IndexingPolicy = new Microsoft.Azure.Cosmos.IndexingPolicy()
{
IndexingMode = Microsoft.Azure.Cosmos.IndexingMode.None,
Automatic = false
}
};
var containerCheck = await databaseClient.CreateContainerIfNotExistsAsync(containerProperties);
if (containerCheck.StatusCode == HttpStatusCode.Created)
Console.WriteLine($"Successfully Created Container: {collectionName}");
else
{
if (containerCheck.StatusCode == HttpStatusCode.OK)
{
await databaseClient.GetContainer(collectionName).ReplaceContainerAsync(containerProperties);
Console.WriteLine($"Container: {collectionName} Exists");
}
}
var jsonStringReplace = "[{\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"load\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.650\", \"id\": \"2743\"}, {\"job_id\": 122, \"job_name\": \"mongo_cosmos_job\", \"job_type\": \"onetimeDataOffload\", \"source_connector_id\": 98.0, \"source_connector_name\": \"mongo_src\", \"source_type\": \"mongo\", \"source_database\": \"company\", \"destination_connector_id\": 99.0, \"destination_connector_name\": \"cosmosmongo_serverless_dest\", \"destination_type\": \"cosmos\", \"destination_database\": \"mongo_cosmos_db\", \"dag_run_id\": \"manual__2021-10-06T09:56:07.175777+00:00\", \"task_id\": \"extract_and_load_company.test_collection\", \"task_name\": \"extract_and_load_company.test_collection\", \"table_name\": \"test_collection\", \"stage\": \"transform\", \"record_count\": 0, \"size_in_mb\": 0.0, \"duration\": 0.0, \"is_aggregated\": false, \"is_test_dag\": false, \"chunk_size\": 5000.0, \"ru_level\": \"\", \"ru\": 0.0, \"job_execution_datetime\": \"2021-10-06 09:56:07.177\", \"task_execution_datetime\": \"2021-10-06 09:56:07.177\", \"job_status\": \"success\", \"task_execution_endtime\": \"2021-10-06 09:56:42.597\", \"id\": \"2742\"}]";
Console.WriteLine("Received Json String Data");
var binaryDownloadData = JsonConvert.DeserializeObject>(jsonStringReplace);
Console.WriteLine("Converted Json String Data");
Microsoft.Azure.Cosmos.Container container = databaseClient.GetContainer(collectionName);
List tasks = new List(binaryDownloadData.Count);
foreach (var item in binaryDownloadData)
{
tasks.Add(container.CreateItemAsync(item, new Microsoft.Azure.Cosmos.PartitionKey(partitionKey))
.ContinueWith(itemResponse =>
{
if (!itemResponse.IsCompletedSuccessfully)
{
AggregateException innerExceptions = itemResponse.Exception.Flatten();
if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) is CosmosException cosmosException)
{
Console.WriteLine($"Received {cosmosException.StatusCode} ({cosmosException.Message}).");
}
else
{
Console.WriteLine($"Exception {innerExceptions.InnerExceptions.FirstOrDefault()}.");
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
}
}
}