Skip to content

[Internal] Samples: Adds change feed pull model samples #3646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<AssemblyName>Cosmos.Samples.CFPullModelAllVersionsAndDeletesMode</AssemblyName>
<RootNamespace>Cosmos.Samples.CFPullModelAllVersionsAndDeletesMode</RootNamespace>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.32.0-preview" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\appSettings.json" Link="appSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
namespace CFPullModelAllVersionsAndDeletesMode
{
using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;

class Program
{
private static readonly string databaseName = "db";
private static readonly string containerName = "container";

static async Task Main()
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();

string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid EndPointUrl in the appSettings.json");
}

string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}

using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"Getting container reference for {containerName}.");

ContainerProperties properties = new ContainerProperties(containerName, partitionKeyPath: "/id");

await client.CreateDatabaseIfNotExistsAsync(databaseName);
Container container = await client.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(properties);

string allVersionsContinuationToken = await CreateAllVersionsAndDeletesChangeFeedIterator(container);

await IngestData(container);
await DeleteData(container);

await ReadAllVersionsAndDeletesChangeFeed(container, allVersionsContinuationToken);
}
}
finally
{
Console.WriteLine("End of demo.");
}
}

static async Task<string> CreateAllVersionsAndDeletesChangeFeedIterator(Container container)
{
Console.WriteLine("Creating ChangeFeedIterator to read the change feed in All Versions and Deletes mode.");

// <InitializeFeedIterator>
using (FeedIterator<AllVersionsAndDeletesCFResponse> allVersionsIterator = container
.GetChangeFeedIterator<AllVersionsAndDeletesCFResponse>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes))
{
while (allVersionsIterator.HasMoreResults)
{
FeedResponse<AllVersionsAndDeletesCFResponse> response = await allVersionsIterator.ReadNextAsync();

if (response.StatusCode == HttpStatusCode.NotModified)
{
return response.ContinuationToken;
}
}
}
// <InitializeFeedIterator>

return null;
}

static async Task IngestData(Container container)
{
Console.Clear();

Console.WriteLine("Press any key to begin ingesting data.");
Console.ReadKey(true);

Console.WriteLine("Press any key to stop.");

while (!Console.KeyAvailable)
{
Item item = GenerateItem();
await container.UpsertItemAsync(item, new PartitionKey(item.Id));
Console.Write("*");
}
}

static async Task DeleteData(Container container)
{
Console.ReadKey(true);
Console.Clear();

Console.WriteLine("Press any key to begin deleting data.");
Console.ReadKey(true);

Console.WriteLine("Press any key to stop");

int deleteItemCounter = 0;
while (!Console.KeyAvailable)
{
deleteItemCounter++;
try
{
await container.DeleteItemAsync<Item>(
partitionKey: new PartitionKey(deleteItemCounter.ToString()),
id: deleteItemCounter.ToString());
Console.Write("-");
}
catch (CosmosException cosmosException) when (cosmosException.StatusCode == HttpStatusCode.NotFound)
{
// Deleting by a random id that might not exist in the container will likely throw errors that are safe to ignore for this purpose
}
}
}

static async Task ReadAllVersionsAndDeletesChangeFeed(Container container, string allVersionsContinuationToken)
{
Console.ReadKey(true);
Console.Clear();

Console.WriteLine("Press any key to start reading the change feed in All Versions and Deletes mode.");
Console.ReadKey(true);

Console.WriteLine("Press any key to stop.");

// <ReadAllVersionsAndDeletesChanges>
using (FeedIterator<AllVersionsAndDeletesCFResponse> allVersionsIterator = container.GetChangeFeedIterator<AllVersionsAndDeletesCFResponse>(ChangeFeedStartFrom.ContinuationToken(allVersionsContinuationToken), ChangeFeedMode.AllVersionsAndDeletes, new ChangeFeedRequestOptions { PageSizeHint = 10 }))
{
while (allVersionsIterator.HasMoreResults)
{
FeedResponse<AllVersionsAndDeletesCFResponse> response = await allVersionsIterator.ReadNextAsync();

if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(1000);
}
else
{
foreach (AllVersionsAndDeletesCFResponse r in response)
{
// if operaiton is delete
if (r.Metadata.OperationType == "delete")
{
Item item = r.Previous;

if (r.Metadata.TimeToLiveExpired == true)
{
Console.WriteLine($"Operation: {r.Metadata.OperationType} (due to TTL). Item id: {item.Id}. Previous value: {item.Value}");
}
else
{
Console.WriteLine($"Operation: {r.Metadata.OperationType} (not due to TTL). Item id: {item.Id}. Previous value: {item.Value}");
}
}
// if operation is create or replace
else
{
Item item = r.Current;

Console.WriteLine($"Operation: {r.Metadata.OperationType}. Item id: {item.Id}. Current value: {item.Value}");
}
}
}

if (Console.KeyAvailable)
{
break;
}
}
}
// <ReadAllVersionsAndDeletesChanges>
}

private static Item GenerateItem()
{
Random random = new Random();

return new Item
{
Id = random.Next(1, 999).ToString(),
Value = random.Next(1, 100000),
};
}
}

internal class AllVersionsAndDeletesCFResponse
{
[JsonProperty("current")]
public Item Current { get; set; }

[JsonProperty("previous")]
public Item Previous { get; set; }

[JsonProperty("metadata")]
public Metadata Metadata { get; set; }
}

internal class Item
{
[JsonProperty("id")]
public string Id { get; set; }

public double Value { get; set; }
}

internal class Metadata
{
[JsonProperty("operationType")]
public string OperationType { get; set; }

[JsonProperty("timeToLiveExpired")]
public Boolean TimeToLiveExpired { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<AssemblyName>Cosmos.Samples.CFPullModelLatestVersionMode</AssemblyName>
<RootNamespace>Cosmos.Samples.CFPullModelLatestVersionMode</RootNamespace>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.32.0-preview" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\appSettings.json" Link="appSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Loading