Cosmos Client Library Documentation
Getting started
- Use Visual Studio 2019 v16.11.3 or newer.
- This is Windows-only code; you can still use it for reference and to base your own improved version!
- Use the Nuget package!
- Make sure you use
c++latest
as the<format>
is no longer in thec++20
option pending ABI resolution. - Read the examples! Notably
TEST(CosmosClient, queryDocuments_thread)
where I demonstrate how you can use threads and<latch>
and<barrier>
.
Dependencies
We use NuGet for dependencies. Why? Simply put, it is the easiest source for obtaining packages.
Package | Comments |
---|---|
nlohmann.json |
This is one of the simplest JSON libraries for C++. We have to make choices and this is our choice: C++ and STL-aware, clean, simple and elegant over efficiency. |
asynchrony-lib |
This is library with helpers for encryption, base64 encode/decode and token generation for Cosmos. |
restcl |
This is our REST client. There are many, many, many clients out there and I’m not quite happy with their design. This implementation focuses on the “interface” as a thin wrapper atop WinHTTP library. |
AzureCppUtils |
This is library with helpers for encryption, base64 encode/decode and token generation for Cosmos. |
API
namespace siddiqsoft
A single class CosmosClient implements the functionality.
Note that since this is a header, you’ll also get the contents of the azure-cpp-utils
and restcl
as well as supporting code.
C++ modules are a solution to this problem but the implementation in VS 2019 v16.11.3 is flaky and I could not get it working!
struct CosmosArgumentType
This structure is used to specify arguments to the async
operations and as a parameter to the callback for async completion callback.
CosmosArgumentType | Type | Description |
---|---|---|
operation |
std::string |
Mandatory; one of the following (corresponds to the method):discoverRegion , listDatabases , listCollections ,create , upsert , update , remove ,</br>listDocuments , find , query |
database |
std::string |
Database name |
collection |
std::string |
Collection name |
id |
std::string |
The unique document id. Required for operations: find , update , remove |
partitionKey |
std::string |
Required for operaions: update , find , remove , query .In the case of query , this may be * to indicate cross-partition query. |
continuationToken |
std::string |
Used when there would be more than 100 items requrned by the server for operations: listDocuments , find and query .If you find this field in the response then you must use iteration to fetch the rest of the documents. |
queryStatement |
std::string |
The query string. May include tokens with values in the queryParameters json |
queryParameters |
nlohmann::json |
An array of key-value arguments matching the tokens in the queryString |
document |
nlohmann::json |
The document to create/upsert/update |
onResponse |
function | Callback/lambda to receive the response for the given async request. Invoked with const reference to the argument structure and the response. |
Why use structure instead of explicit parameters?
- Pros
- Easy to package for async calls as we move into the deque which is processed by the
asyncDispatcher
method.
- Easy to package for async calls as we move into the deque which is processed by the
- Cons
- The parameters are not discoverable via signature.
Signature
struct CosmosArgumentType
{
std::string operation {};
std::string database {};
std::string collection {};
std::string id {};
std::string partitionKey {};
std::string continuationToken {};
std::string queryStatement {};
nlohmann::json queryParameters;
nlohmann::json document;
std::function<void(CosmosArgumentType const&, CosmosResponseType const&)> onResponse {};
};
struct CosmosResponseType
This is the primary return type from the CosmosClient functions.
Includes serializers for nlohmann::json via the macro NLOHMANN_DEFINE_TYPE_INTRUSIVE
.
struct CosmosResponseType
{
uint32_t statusCode {};
nlohmann::json document;
std::chrono::microseconds ttx {};
bool success();
};
CosmosResponseType | Type | Description |
---|---|---|
statusCode |
uint32_t |
Holds the HTTP response Status Code or the system-error code (WinHTTP error code) |
document |
nlohmann::json |
Holds the response from the Cosmos response or contains the information about the error if there is an IO error. This will allow you to diagnose the details from Cosmos response. |
ttx |
std::chrono::microseconds |
Holds the time taken for the operation. |
success() |
bool |
Returns if the statusCode < 300 indicating successful REST request. |
Azure Cosmos REST API status codes has the full list with detailed explanations.
statusCode | Comment |
---|---|
200 |
Success. listDatabases , listCollections , listDocuments , find , query , upsert |
201 |
Document created. create , upsert |
204 |
Document removed. remove |
401 |
Authorization failure. |
404 |
Document Not found. |
409 |
Document already exists. create |
Output Sample
The json document typically contains two elements: _count
and Documents
which is an array of documents from the functions listDocuments(), listCollections(), listDocuments(), query().
{
"Documents": [
{
"__pk": "PARTITION-KEY-VALUE",
"_attachments": "attachments/",
"_etag": "\"7e0141d3-0000-0400-0000-612ec6f10000\"",
"_rid": "RP0wAM6H+R7-8VEAAAAAAQ==",
"_self": "dbs/RP0wAA==/colls/RP0wAM6H+R4=/docs/RP0wAM6H+R7-8VEAAAAAAQ==/",
"_ts": 1630455537,
"i": 1,
"id": "UNIQUE-DOCUMENT-ID",
},
...
...
],
"_count": 2
}
Output Sample
For operations on single documents create()
, find()
, update()
return the document as stored (with Cosmos injected housekeeping data).
{
"__pk": "PARTITION-KEY-VALUE",
"_attachments": "attachments/",
"_etag": "\"7e0141d3-0000-0400-0000-612ec6f10000\"",
"_rid": "RP0wAM6H+R7-8VEAAAAAAQ==",
"_self": "dbs/RP0wAA==/colls/RP0wAM6H+R4=/docs/RP0wAM6H+R7-8VEAAAAAAQ==/",
"_ts": 1630455537,
"i": 1,
"id": "UNIQUE-DOCUMENT-ID",
}
struct CosmosIterableResponseType
Extends the CosmosResponseType by adding the continuationToken
data member
to facilitate iteration for the methods listDocuments and query.
struct CosmosIterableResponseType : CosmosResponseType
{
std::string continuationToken {};
};
CosmosIterableResponseType | Type | Description |
---|---|---|
continuationToken |
string |
Do not modify; this is the x-ms-continuation token from the response header and inpat for the listDocuments and query methods to obtain all of the documents. |
using CosmosAsyncCallbackType
using CosmosAsyncCallbackType = std::function<void(CosmosArgumentType const& context, CosmosResponseType const& response)>;
Type | Description | |
---|---|---|
context |
CosmosArgumentType const& |
Const-reference to the original request. |
response |
CosmosResponseType const& |
Const-reference to the response from the server. |
class CosmosClient
Implements the Cosmos SQL-API via REST. Omits the attachment API as of this version.
class CosmosClient
{
protected:
nlohmann::json config;
nlohmann::json serviceSettings;
std::atomic_bool isConfigured;
WinHttpRESTClient restClient;
CosmosConnection cnxn;
simple_pool<CosmosArgumentType> asyncWorkers;
void asyncDispatcher(CosmosArgumentType&);
public:
CosmosClient();
CosmosClient(CosmosClient&&);
auto& operator=(CosmosClient&&) = delete;
CosmosClient(const CosmosClient&) = delete;
auto& operator=(const CosmosClient&) = delete;
const nlohmann::json& configuration()
CosmosClient& configure(const nlohmann::json&) noexcept(false);
void async(CosmosArgumentType&& op);
CosmosResponseType discoverRegions();
CosmosResponseType listDatabases();
CosmosResponseType listCollections(CosmosArgumentType const&);
CosmosIterableResponseType listDocuments(CosmosArgumentType const&);
CosmosResponseType createDocument(CosmosArgumentType const&);
CosmosResponseType upsertDocument(CosmosArgumentType const&);
CosmosResponseType updateDocument(CosmosArgumentType const&);
uint32_t removeDocument(CosmosArgumentType const&);
CosmosIterableResponseType queryDocuments(CosmosArgumentType const&);
CosmosResponseType findDocument(CosmosArgumentType const&);
friend void to_json(nlohmann::json&, const CosmosClient&);
}
Member variables
Type | Description | |
---|---|---|
config 🔒 |
nlohmann::json |
Configuration for the client |
serviceSettings 🔒 |
nlohmann:json |
Azure region information from by discoverRegion() call via configure() . |
isConfigured 🔒 |
atomic_bool |
Signalled when the configuration() has been successfully invoked. |
restClient 🔒 |
WinHttpRESTClient |
The Rest Client is used for all operations against Cosmos. Multiple threads may use this class without issue as the underlying send() only uses the lone HINTERNET and the underlying WinHTTP library performs the connection-pooling. |
cnxn 🔒 |
CosmosConnection |
Represents the Primary and optionally Secondary connection string. Holds the information on the current read/write enpoints and the encryption keys. This is un-important to the client and its implementation may change without affecting the user-facing API. |
asyncWorkers |
simple_pool<CosmosArgumentType> |
Implements asynchrony to the cosmos library. |
Member Functions
Returns | Description | |
---|---|---|
CosmosClient ⎔ |
Default constructor. Move constructors, assignment operators are not relevant and have been deleted. |
|
configuration ⎔ |
const nlohmann::json& |
Return the as const the config object. |
configure ⎔ |
CosmosClient& |
Configures the client by setting up the cnxn object, invokes discoverRegions to build the readable/writable locations for the region and prepares the current read/write locations.Do not invoke this method as it causes the underlying objects to be reset and will likely break any operations. |
discoverRegions ⎔ |
CosmosResponseType |
Returns service configuration such as settings, regions, read and write locations. |
listDatabases ⎔ |
CosmosResponseType |
Returns Documents[] containing the ids of the databases for this cosmos service endpoint. |
listCollections ⎔ |
CosmosResponseType |
Returns Documents[] containing the ids of the collections in the given database. |
listDocuments ⎔ |
CosmosIterableResponseType |
Returns zero-or-more documents in the given collection. The client is responsible for repeatedly invoking this method to pull all items. |
createDocument ⎔ |
CosmosResponseType |
Creates (add) single document to given collection in the database. |
upsertDocument ⎔ |
CosmosResponseType |
Create of update a document in the given collection in the database. |
updateDocument ⎔ |
CosmosResponseType |
Update a document in the given collection in the database. |
removeDocument ⎔ |
CosmosResponseType |
Remove a document matching the document id in the given collection. |
queryDocuments ⎔ |
CosmosIterableResponseType |
Returns zero-or-more items matching the given search query and parameters. The client is responsible for repeatedly invoking this method to pull all items. |
findDocument ⎔ |
CosmosResponseType |
Finds and returns a single document matching the given document id. |
async ⎔ |
Queues the specified request for asynchronous completion. The property .onResponse and .operation must be provided otherwise this will throw and invalid_argument exception. |
|
to_json ⎔ |
Serializer for CosmosClient to a json object. | |
asyncDispatcher |
Implements the dispatch from the queue and recovery/retry logic. |
There are also implementations for serializing via
std::format
using the json serializer.
CosmosClient::config
nlohmann::json
- Object stores the various configuration elements for this client instance.
The following elements are required:
apiVersion
- Defaults to2018-12-31
; you should not provide this value as the library is mated to the specific version.connectionStrings
- An array of one or two connection strings you’d get from the Azure Cosmos Keys portal. These may be “read-write” or “Read-only” values depending on your application use.
If you use read-only keys then you will get errors invokingcreate
,upsert
,update
.partitionKeyNames
- An array of one or more partition key fields that must be present in each document. This is also configured in the Azure Portal.
Sample/default
nlohmann::json config { {"_typever", CosmosClientUserAgentString},
{"apiVersion", "2018-12-31"},
{"connectionStrings", {}},
{"partitionKeyNames", {}} };
CosmosClient::serviceSettings
Service Settings saved from discoverRegions
This data is used within the method configure
and used thereafter for informative purposes.
Guard against writers.
CosmosClient::isConfigured
atomic_bool
- Indicates that the client has been configured.
CosmosClient::restClient
The I/O utility class from restcl.
CosmosClient::cnxn
This is the core data-structure and holds the read, write endpoints, primary and secondary connections and the encryption key.
It is not intended to be used by the client and its implementation is subject to change.
CosmosClient::CosmosClient
Empty default constructor. All of the move constructor and assignment operators and move operators have been deleted.
CosmosClient::configuration
Returns the current config json object as const
.
return
Const reference to the config
object.
CosmosClient::configure
CosmosClient& configure(const nlohmann::json& src = {});
Re-configure the client. This will invoke the discoverRegions to populate the available regions and sets the primary write/read locations. If the source is empty (no arguments) then the current configuration is returned.
Avoid repeated invocations!
This method may throw if the underling call to discoverRegions
fails.
param src
A valid json object must comply with the defaults. If empty, the current configuration is returned without any changes.
Must have the following elements:
{
"connectionStrings": ["primary-connection-string"],
"primaryKeyNames": ["field-name-of-partition-id"]
}
return
Reference to the CosmosClient object.
CosmosClient::async
Perform any supported operation using the simple_pool
which uses a std::deque
with a threadpool to perform the IO and recovery for simple errors such as 401
and IO transient errors.
void async(CosmosArgumentType&& op);
params
Parameter | Type | Description |
---|---|---|
op |
CosmosArgumentType&& |
The request to be executed asynchronously. Note The parameter is moved into the underlying queue and must be std::move ‘d into the function call. The field .onResponse must be provided otherwise it will throw invalid_argument exception. |
examples
A simple async call uses structured binding introduced in C++17 to construct the argument which is move’d into the async-queue.
siddiqsoft::CosmosClient cc;
// First we Configure.. then we queue..
cc.configure(partitionKeyNames,
{"connectionStrings", {priConnStr, secConnStr}}})
.async({.operation = "listDatabases",
.onResponse = [](auto const& ctx, auto const& resp) {
// Print the list of databases..
std::cout << std::format("List of databases\n{}\n",
resp.document.dump(3));
..
}});
This is a complete example (without error checking) to illustrate how you can nest operations repeatedly and have them complete asynchronously!
std::atomic_bool passTest = false;
std::string priConnStr = std::getenv("CCTEST_PRIMARY_CS");
std::string secConnStr = std::getenv("CCTEST_SECONDARY_CS");
siddiqsoft::CosmosClient cc;
cc.configure(partitionKeyNames,
{"connectionStrings", {priConnStr, secConnStr}}});
// #1..Start by listing the databases
cc.async({.operation = "listDatabases",
.onResponse =[&cc, &passTest](auto const& ctx,
auto const& resp) {
// #2..On completion of listDatabases, get the collections..
cc.async({.operation = "listCollections",
.database = resp.document.value("/Databases/0/id"_json_pointer, ""),
.onResponse = [&cc, &passTest](auto const& ctx,
auto const& resp) {
// #3..On successful listCollections, lets create a document..
auto docId = std::format("azure-cosmos-restcl.{}",
std::chrono::system_clock()
.now()
.time_since_epoch()
.count());
auto pkId = "siddiqsoft.com";
// #3..Create the document..
cc.async({.operation = "create",
.database = ctx.database,
.collection = resp.document.value("/DocumentCollections/0/id"_json_pointer, ""),
.id = docId,
.partitionKey = pkId,
.document = { {"id", docId},
{"ttl", 360},
{"__pk", pkId},
{"func", __func__},
{"source", "basic_tests.exe"}},
.onResponse = [&cc, &passTest](auto const& ctx,
auto const& resp) {
// #4..We created the document..
// ...
// #4..Remove the document
cc.async({.operation = "remove",
.database = ctx.database,
.collection = ctx.collection,
.id = resp.document.value("id", ctx.id),
.partitionKey = ctx.partitionKey,
.onResponse = [&cc, &passTest](auto const& ctx,
auto const& resp) {
// #5..All done. the document should be gone.
passTest = true;
passTest.notify_all();
}});
}});
}});
}});
// Holds until the test completes
passTest.wait(false);
NOTE
When using lambda captures, it is critical that you avoid using
&
and specify the values you’d like to capture explicitly. Using the&
causes heap errors and slows down as the compiler generates an ever larger heap for you.Also, if you’re using AddressSanitier, it will complain. The above example uses explicit capture and enables AddressSanitizer without issue.
CosmosClient::discoverRegions
CosmosResponseType discoverRegions();
Discover the Regions for the current base Uri
This method is invoked by the configuration
method.
return
CosmosClient::listDatabases
CosmosResponseType listDatabases();
params
Parameter | Type | Description |
---|---|---|
List all databases for the given service
Refer to https://docs.microsoft.com/en-us/rest/api/documentdb/documentdb-resource-uri-syntax-for-rest
return
CosmosClient::listCollections
CosmosResponseType listCollections(CosmosArgumentType const& ctx);
List all collections for the given database.
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
return
CosmosClient::listDocuments
Returns all of the documents for the given collection.
This can be very slow and you must use the continuationToken to repeatedly call the method to complete your fetch. See example below.
CosmosIterableResponseType listDocuments(CosmosArgumentType const& ctx);
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
.collection |
std::string |
Collection name. |
.continuationToken |
std::string |
Optional. The first invocation would not have this value. However, subsequent invocations will include this value from the response to iterate through the documents. |
return
Iterate Example
Refer to the test listDocuments
for working example.
siddiqsoft::CosmosIterableResponseType irt {};
// This is your destination container; note that the first element is null in this example
nlohmann::json allDocs = nlohmann::json::array();
// This is the running count of the total
uint32_t allDocsCount {};
// First, we query for all items that match our criteria (source=__func__)
do {
irt = cc.queryDocuments(dbName,
collectionName,
"*", // across all partitions
"SELECT * FROM c WHERE contains(c.source, @v1)",
{ { {"name", "@v1"},
{"value", std::format("{}-", getpid())}
} },
irt.continuationToken);
if (200 == irt.statusCode &&
irt.document.contains("Documents") &&
!irt.document.at("Documents").is_null())
{
// Append to the current container
allDocs.insert(allDocs.end(), // dest
irt.document["Documents"].begin(), // from
irt.document["Documents"].end());
allDocsCount += irt.document.value("_count", 0);
}
} while (!irt.continuationToken.empty());
CosmosClient::createDocument
Create document in the given collection.
CosmosResponseType createDocument(CosmosArgumentType const& ctx);
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
.collection |
std::string |
Collection name. |
.document |
nlohmann::json |
The document to insert in the collection |
return
CosmosClient::upsertDocument
Update or Insert document in the given collection.
CosmosResponseType upsert((CosmosArgumentType const& ctx));
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
.collection |
std::string |
Collection name. |
.document |
nlohmann::json |
The document to insert or update |
return
CosmosClient::updateDocument
CosmosResponseType update(CosmosArgumentType const& ctx);
Update an existing document
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
.collection |
std::string |
Collection name. |
.id |
std::string |
The document id. |
.partitionKey |
std::string |
Partition key. |
.document |
nlohmann::json |
The document to update |
return
CosmosClient::removeDocument
uint32_t remove(CosmosArgumentType const& ctx);
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
.collection |
std::string |
Collection name. |
.id |
std::string |
The document id. |
.partitionKey |
std::string |
Partition key. |
return
CosmosClient::queryDocuments
CosmosIterableResponseType query(CosmosArgumentType const& ctx);
Performs a query and continues an existing query if the continuation token exists
CAUTION: If your query yields dozens or hundreds or thousands of documents, this method currently does not offer “streaming” or callbacks. It continues until the server reports no-more-continuation and returns the documents in a single response json. Memory and timing can be massive!
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
.collection |
std::string |
Collection name. |
.partitionKey |
std::string |
Partition key. You can use * to indicate query across partitions. |
.queryStatement |
std::string |
The SQL API query string."SELECT * FROM c WHERE contains(c.source, @v1)" |
.queryParameters |
nlohmann::json |
Optional json array with name-value objects.[{"name": "@v1", "value": "hello.world"}] |
.continuationToken |
std::string |
Optional continuation token. This is used with the value x-ms-continuation to page through the query response. |
remarks
The response is paged so you will need to combine the results into
delivering to the user a single json with array of Documents
object and _count
.
See Iterate example on the contents and how to fetch all documents by iterating over the results.
return
CosmosClient::findDocument
CosmosResponseType find(CosmosArgumentType const& ctx);
Removes the document given the docId and pkId
See https://docs.microsoft.com/en-us/rest/api/documentdb/delete-a-document
params
Parameter | Type | Description |
---|---|---|
.database |
std::string |
Database name. |
.collection |
std::string |
Collection name. |
.id |
std::string |
The document id. |
.partitionKey |
std::string |
Partition key. You can use * to indicate query across partitions. |
return
Returns uint32_t
with 204
on successful removal or 404
if document does not exist.
Tests
- The tests are written using Googletest framework instead of VSTest.
- Enabling the address sanitizer threw errors enough to switch to googletest.
- AddressSanitizer is disabled for the test as it ends up hanging the multi-thread tests when using
std::latch
and/orstd::barrier
. - The roll-up is not accurate depsite the fact that we’ve got 24 tests only 10 are reported!