|
| 1 | +namespace CFPullModelAllVersionsAndDeletesMode |
| 2 | +{ |
| 3 | + using System; |
| 4 | + using System.Net; |
| 5 | + using System.Threading.Tasks; |
| 6 | + using Microsoft.Azure.Cosmos; |
| 7 | + using Microsoft.Extensions.Configuration; |
| 8 | + using Newtonsoft.Json; |
| 9 | + |
| 10 | + class Program |
| 11 | + { |
| 12 | + private static readonly string databaseName = "db"; |
| 13 | + private static readonly string containerName = "container"; |
| 14 | + |
| 15 | + static async Task Main() |
| 16 | + { |
| 17 | + try |
| 18 | + { |
| 19 | + IConfigurationRoot configuration = new ConfigurationBuilder() |
| 20 | + .AddJsonFile("appSettings.json") |
| 21 | + .Build(); |
| 22 | + |
| 23 | + string endpoint = configuration["EndPointUrl"]; |
| 24 | + if (string.IsNullOrEmpty(endpoint)) |
| 25 | + { |
| 26 | + throw new ArgumentNullException("Please specify a valid EndPointUrl in the appSettings.json"); |
| 27 | + } |
| 28 | + |
| 29 | + string authKey = configuration["AuthorizationKey"]; |
| 30 | + if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key")) |
| 31 | + { |
| 32 | + throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json"); |
| 33 | + } |
| 34 | + |
| 35 | + using (CosmosClient client = new CosmosClient(endpoint, authKey)) |
| 36 | + { |
| 37 | + Console.WriteLine($"Getting container reference for {containerName}."); |
| 38 | + |
| 39 | + ContainerProperties properties = new ContainerProperties(containerName, partitionKeyPath: "/id"); |
| 40 | + |
| 41 | + await client.CreateDatabaseIfNotExistsAsync(databaseName); |
| 42 | + Container container = await client.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(properties); |
| 43 | + |
| 44 | + string allVersionsContinuationToken = await CreateAllVersionsAndDeletesChangeFeedIterator(container); |
| 45 | + |
| 46 | + await IngestData(container); |
| 47 | + await DeleteData(container); |
| 48 | + |
| 49 | + await ReadAllVersionsAndDeletesChangeFeed(container, allVersionsContinuationToken); |
| 50 | + } |
| 51 | + } |
| 52 | + finally |
| 53 | + { |
| 54 | + Console.WriteLine("End of demo."); |
| 55 | + } |
| 56 | + } |
| 57 | + |
| 58 | + static async Task<string> CreateAllVersionsAndDeletesChangeFeedIterator(Container container) |
| 59 | + { |
| 60 | + Console.WriteLine("Creating ChangeFeedIterator to read the change feed in All Versions and Deletes mode."); |
| 61 | + |
| 62 | + // <InitializeFeedIterator> |
| 63 | + using (FeedIterator<AllVersionsAndDeletesCFResponse> allVersionsIterator = container |
| 64 | + .GetChangeFeedIterator<AllVersionsAndDeletesCFResponse>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes)) |
| 65 | + { |
| 66 | + while (allVersionsIterator.HasMoreResults) |
| 67 | + { |
| 68 | + FeedResponse<AllVersionsAndDeletesCFResponse> response = await allVersionsIterator.ReadNextAsync(); |
| 69 | + |
| 70 | + if (response.StatusCode == HttpStatusCode.NotModified) |
| 71 | + { |
| 72 | + return response.ContinuationToken; |
| 73 | + } |
| 74 | + } |
| 75 | + } |
| 76 | + // <InitializeFeedIterator> |
| 77 | + |
| 78 | + return null; |
| 79 | + } |
| 80 | + |
| 81 | + static async Task IngestData(Container container) |
| 82 | + { |
| 83 | + Console.Clear(); |
| 84 | + |
| 85 | + Console.WriteLine("Press any key to begin ingesting data."); |
| 86 | + Console.ReadKey(true); |
| 87 | + |
| 88 | + Console.WriteLine("Press any key to stop."); |
| 89 | + |
| 90 | + while (!Console.KeyAvailable) |
| 91 | + { |
| 92 | + Item item = GenerateItem(); |
| 93 | + await container.UpsertItemAsync(item, new PartitionKey(item.Id)); |
| 94 | + Console.Write("*"); |
| 95 | + } |
| 96 | + } |
| 97 | + |
| 98 | + static async Task DeleteData(Container container) |
| 99 | + { |
| 100 | + Console.ReadKey(true); |
| 101 | + Console.Clear(); |
| 102 | + |
| 103 | + Console.WriteLine("Press any key to begin deleting data."); |
| 104 | + Console.ReadKey(true); |
| 105 | + |
| 106 | + Console.WriteLine("Press any key to stop"); |
| 107 | + |
| 108 | + int deleteItemCounter = 0; |
| 109 | + while (!Console.KeyAvailable) |
| 110 | + { |
| 111 | + deleteItemCounter++; |
| 112 | + try |
| 113 | + { |
| 114 | + await container.DeleteItemAsync<Item>( |
| 115 | + partitionKey: new PartitionKey(deleteItemCounter.ToString()), |
| 116 | + id: deleteItemCounter.ToString()); |
| 117 | + Console.Write("-"); |
| 118 | + } |
| 119 | + catch (CosmosException cosmosException) when (cosmosException.StatusCode == HttpStatusCode.NotFound) |
| 120 | + { |
| 121 | + // Deleting by a random id that might not exist in the container will likely throw errors that are safe to ignore for this purpose |
| 122 | + } |
| 123 | + } |
| 124 | + } |
| 125 | + |
| 126 | + static async Task ReadAllVersionsAndDeletesChangeFeed(Container container, string allVersionsContinuationToken) |
| 127 | + { |
| 128 | + Console.ReadKey(true); |
| 129 | + Console.Clear(); |
| 130 | + |
| 131 | + Console.WriteLine("Press any key to start reading the change feed in All Versions and Deletes mode."); |
| 132 | + Console.ReadKey(true); |
| 133 | + |
| 134 | + Console.WriteLine("Press any key to stop."); |
| 135 | + |
| 136 | + // <ReadAllVersionsAndDeletesChanges> |
| 137 | + using (FeedIterator<AllVersionsAndDeletesCFResponse> allVersionsIterator = container.GetChangeFeedIterator<AllVersionsAndDeletesCFResponse>(ChangeFeedStartFrom.ContinuationToken(allVersionsContinuationToken), ChangeFeedMode.AllVersionsAndDeletes, new ChangeFeedRequestOptions { PageSizeHint = 10 })) |
| 138 | + { |
| 139 | + while (allVersionsIterator.HasMoreResults) |
| 140 | + { |
| 141 | + FeedResponse<AllVersionsAndDeletesCFResponse> response = await allVersionsIterator.ReadNextAsync(); |
| 142 | + |
| 143 | + if (response.StatusCode == HttpStatusCode.NotModified) |
| 144 | + { |
| 145 | + Console.WriteLine($"No new changes"); |
| 146 | + await Task.Delay(1000); |
| 147 | + } |
| 148 | + else |
| 149 | + { |
| 150 | + foreach (AllVersionsAndDeletesCFResponse r in response) |
| 151 | + { |
| 152 | + // if operaiton is delete |
| 153 | + if (r.Metadata.OperationType == "delete") |
| 154 | + { |
| 155 | + Item item = r.Previous; |
| 156 | + |
| 157 | + if (r.Metadata.TimeToLiveExpired == true) |
| 158 | + { |
| 159 | + Console.WriteLine($"Operation: {r.Metadata.OperationType} (due to TTL). Item id: {item.Id}. Previous value: {item.Value}"); |
| 160 | + } |
| 161 | + else |
| 162 | + { |
| 163 | + Console.WriteLine($"Operation: {r.Metadata.OperationType} (not due to TTL). Item id: {item.Id}. Previous value: {item.Value}"); |
| 164 | + } |
| 165 | + } |
| 166 | + // if operation is create or replace |
| 167 | + else |
| 168 | + { |
| 169 | + Item item = r.Current; |
| 170 | + |
| 171 | + Console.WriteLine($"Operation: {r.Metadata.OperationType}. Item id: {item.Id}. Current value: {item.Value}"); |
| 172 | + } |
| 173 | + } |
| 174 | + } |
| 175 | + |
| 176 | + if (Console.KeyAvailable) |
| 177 | + { |
| 178 | + break; |
| 179 | + } |
| 180 | + } |
| 181 | + } |
| 182 | + // <ReadAllVersionsAndDeletesChanges> |
| 183 | + } |
| 184 | + |
| 185 | + private static Item GenerateItem() |
| 186 | + { |
| 187 | + Random random = new Random(); |
| 188 | + |
| 189 | + return new Item |
| 190 | + { |
| 191 | + Id = random.Next(1, 999).ToString(), |
| 192 | + Value = random.Next(1, 100000), |
| 193 | + }; |
| 194 | + } |
| 195 | + } |
| 196 | + |
| 197 | + internal class AllVersionsAndDeletesCFResponse |
| 198 | + { |
| 199 | + [JsonProperty("current")] |
| 200 | + public Item Current { get; set; } |
| 201 | + |
| 202 | + [JsonProperty("previous")] |
| 203 | + public Item Previous { get; set; } |
| 204 | + |
| 205 | + [JsonProperty("metadata")] |
| 206 | + public Metadata Metadata { get; set; } |
| 207 | + } |
| 208 | + |
| 209 | + internal class Item |
| 210 | + { |
| 211 | + [JsonProperty("id")] |
| 212 | + public string Id { get; set; } |
| 213 | + |
| 214 | + public double Value { get; set; } |
| 215 | + } |
| 216 | + |
| 217 | + internal class Metadata |
| 218 | + { |
| 219 | + [JsonProperty("operationType")] |
| 220 | + public string OperationType { get; set; } |
| 221 | + |
| 222 | + [JsonProperty("timeToLiveExpired")] |
| 223 | + public Boolean TimeToLiveExpired { get; set; } |
| 224 | + } |
| 225 | +} |
0 commit comments