Skip to content

Commit dace63b

Browse files
committed
.Net: Add qdrant vector record store implementation (#6904)
### Motivation and Context As part of the evolution of memory connectors, we need to support custom data models and remove opinionated behaviors, so adding a new record store implementation for qdrant. ### Description Adding an implementation for IVectorRecordStore for qdrant with support for: Custom mappers Generic data models Annotating data models via attributes or via definition objects. Also improving some styling in the AzureAISearch implementation. See #6525 ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone 😄
1 parent a964eea commit dace63b

19 files changed

+1645
-132
lines changed

dotnet/Directory.Packages.props

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
<PackageVersion Include="Milvus.Client" Version="2.3.0-preview.1" />
9393
<PackageVersion Include="Testcontainers.Milvus" Version="3.8.0" />
9494
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.2.0" />
95+
<PackageVersion Include="Qdrant.Client" Version="1.9.0" />
9596
<!-- Symbols -->
9697
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
9798
<!-- Toolset -->

dotnet/src/Connectors/Connectors.Memory.AzureAISearch/AzureAISearchVectorRecordStore.cs

+60-43
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ namespace Microsoft.SemanticKernel.Connectors.AzureAISearch;
2424
public sealed class AzureAISearchVectorRecordStore<TRecord> : IVectorRecordStore<string, TRecord>
2525
where TRecord : class
2626
{
27+
/// <summary>The name of this database for telemetry purposes.</summary>
28+
private const string DatabaseName = "AzureAISearch";
29+
2730
/// <summary>A set of types that a key on the provided model may have.</summary>
2831
private static readonly HashSet<Type> s_supportedKeyTypes =
2932
[
@@ -100,7 +103,7 @@ public AzureAISearchVectorRecordStore(SearchIndexClient searchIndexClient, Azure
100103
}
101104

102105
/// <inheritdoc />
103-
public Task<TRecord> GetAsync(string key, GetRecordOptions? options = default, CancellationToken cancellationToken = default)
106+
public Task<TRecord?> GetAsync(string key, GetRecordOptions? options = default, CancellationToken cancellationToken = default)
104107
{
105108
Verify.NotNullOrWhiteSpace(key);
106109

@@ -126,7 +129,13 @@ public async IAsyncEnumerable<TRecord> GetBatchAsync(IEnumerable<string> keys, G
126129
var searchClient = this.GetSearchClient(collectionName);
127130
var tasks = keys.Select(key => this.GetDocumentAndMapToDataModelAsync(searchClient, collectionName, key, innerOptions, cancellationToken));
128131
var results = await Task.WhenAll(tasks).ConfigureAwait(false);
129-
foreach (var result in results) { yield return result; }
132+
foreach (var result in results)
133+
{
134+
if (result is not null)
135+
{
136+
yield return result;
137+
}
138+
}
130139
}
131140

132141
/// <inheritdoc />
@@ -203,32 +212,40 @@ public async IAsyncEnumerable<string> UpsertBatchAsync(IEnumerable<TRecord> reco
203212
/// <param name="innerOptions">The azure ai search sdk options for getting a document.</param>
204213
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
205214
/// <returns>The retrieved document, mapped to the consumer data model.</returns>
206-
private async Task<TRecord> GetDocumentAndMapToDataModelAsync(
215+
private async Task<TRecord?> GetDocumentAndMapToDataModelAsync(
207216
SearchClient searchClient,
208217
string collectionName,
209218
string key,
210219
GetDocumentOptions innerOptions,
211220
CancellationToken cancellationToken)
212221
{
222+
const string OperationName = "GetDocument";
223+
213224
// Use the user provided mapper.
214225
if (this._options.MapperType == AzureAISearchRecordMapperType.JsonObjectCustomMapper)
215226
{
216227
var jsonObject = await RunOperationAsync(
217228
collectionName,
218-
"GetDocument",
219-
() => searchClient.GetDocumentAsync<JsonObject>(key, innerOptions, cancellationToken)).ConfigureAwait(false);
229+
OperationName,
230+
() => GetDocumentWithNotFoundHandlingAsync<JsonObject>(searchClient, key, innerOptions, cancellationToken)).ConfigureAwait(false);
231+
232+
if (jsonObject is null)
233+
{
234+
return null;
235+
}
220236

221-
return RunModelConversion(
237+
return VectorStoreErrorHandler.RunModelConversion(
238+
DatabaseName,
222239
collectionName,
223-
"GetDocument",
240+
OperationName,
224241
() => this._options.JsonObjectCustomMapper!.MapFromStorageToDataModel(jsonObject));
225242
}
226243

227244
// Use the built in Azure AI Search mapper.
228245
return await RunOperationAsync(
229246
collectionName,
230-
"GetDocument",
231-
() => searchClient.GetDocumentAsync<TRecord>(key, innerOptions, cancellationToken)).ConfigureAwait(false);
247+
OperationName,
248+
() => GetDocumentWithNotFoundHandlingAsync<TRecord>(searchClient, key, innerOptions, cancellationToken)).ConfigureAwait(false);
232249
}
233250

234251
/// <summary>
@@ -247,24 +264,27 @@ private Task<Response<IndexDocumentsResult>> MapToStorageModelAndUploadDocumentA
247264
IndexDocumentsOptions innerOptions,
248265
CancellationToken cancellationToken)
249266
{
267+
const string OperationName = "UploadDocuments";
268+
250269
// Use the user provided mapper.
251270
if (this._options.MapperType == AzureAISearchRecordMapperType.JsonObjectCustomMapper)
252271
{
253-
var jsonObjects = RunModelConversion(
272+
var jsonObjects = VectorStoreErrorHandler.RunModelConversion(
273+
DatabaseName,
254274
collectionName,
255-
"UploadDocuments",
275+
OperationName,
256276
() => records.Select(this._options.JsonObjectCustomMapper!.MapFromDataToStorageModel));
257277

258278
return RunOperationAsync(
259279
collectionName,
260-
"UploadDocuments",
280+
OperationName,
261281
() => searchClient.UploadDocumentsAsync<JsonObject>(jsonObjects, innerOptions, cancellationToken));
262282
}
263283

264284
// Use the built in Azure AI Search mapper.
265285
return RunOperationAsync(
266286
collectionName,
267-
"UploadDocuments",
287+
OperationName,
268288
() => searchClient.UploadDocumentsAsync<TRecord>(records, innerOptions, cancellationToken));
269289
}
270290

@@ -321,6 +341,31 @@ private GetDocumentOptions ConvertGetDocumentOptions(GetRecordOptions? options)
321341
return innerOptions;
322342
}
323343

344+
/// <summary>
345+
/// Get a document with the given key, and return null if it is not found.
346+
/// </summary>
347+
/// <typeparam name="T">The type to deserialize the document to.</typeparam>
348+
/// <param name="searchClient">The search client to use when fetching the document.</param>
349+
/// <param name="key">The key of the record to get.</param>
350+
/// <param name="innerOptions">The azure ai search sdk options for getting a document.</param>
351+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
352+
/// <returns>The retrieved document, mapped to the consumer data model, or null if not found.</returns>
353+
private static async Task<T?> GetDocumentWithNotFoundHandlingAsync<T>(
354+
SearchClient searchClient,
355+
string key,
356+
GetDocumentOptions innerOptions,
357+
CancellationToken cancellationToken)
358+
{
359+
try
360+
{
361+
return await searchClient.GetDocumentAsync<T>(key, innerOptions, cancellationToken).ConfigureAwait(false);
362+
}
363+
catch (RequestFailedException ex) when (ex.Status == 404)
364+
{
365+
return default;
366+
}
367+
}
368+
324369
/// <summary>
325370
/// Run the given operation and wrap any <see cref="RequestFailedException"/> with <see cref="VectorStoreOperationException"/>."/>
326371
/// </summary>
@@ -341,7 +386,7 @@ private static async Task<T> RunOperationAsync<T>(string collectionName, string
341386

342387
// Using Open Telemetry standard for naming of these entries.
343388
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/db/
344-
wrapperException.Data.Add("db.system", "AzureAISearch");
389+
wrapperException.Data.Add("db.system", DatabaseName);
345390
wrapperException.Data.Add("db.collection.name", collectionName);
346391
wrapperException.Data.Add("db.operation.name", operationName);
347392

@@ -353,35 +398,7 @@ private static async Task<T> RunOperationAsync<T>(string collectionName, string
353398

354399
// Using Open Telemetry standard for naming of these entries.
355400
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/db/
356-
wrapperException.Data.Add("db.system", "AzureAISearch");
357-
wrapperException.Data.Add("db.collection.name", collectionName);
358-
wrapperException.Data.Add("db.operation.name", operationName);
359-
360-
throw wrapperException;
361-
}
362-
}
363-
364-
/// <summary>
365-
/// Run the given model conversion and wrap any exceptions with <see cref="VectorStoreRecordMappingException"/>.
366-
/// </summary>
367-
/// <typeparam name="T">The response type of the operation.</typeparam>
368-
/// <param name="collectionName">The name of the collection the operation is being run on.</param>
369-
/// <param name="operationName">The type of database operation being run.</param>
370-
/// <param name="operation">The operation to run.</param>
371-
/// <returns>The result of the operation.</returns>
372-
private static T RunModelConversion<T>(string collectionName, string operationName, Func<T> operation)
373-
{
374-
try
375-
{
376-
return operation.Invoke();
377-
}
378-
catch (Exception ex)
379-
{
380-
var wrapperException = new VectorStoreRecordMappingException("Failed to convert vector store record.", ex);
381-
382-
// Using Open Telemetry standard for naming of these entries.
383-
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/db/
384-
wrapperException.Data.Add("db.system", "AzureAISearch");
401+
wrapperException.Data.Add("db.system", DatabaseName);
385402
wrapperException.Data.Add("db.collection.name", collectionName);
386403
wrapperException.Data.Add("db.operation.name", operationName);
387404

dotnet/src/Connectors/Connectors.Memory.Qdrant/Connectors.Memory.Qdrant.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
<ItemGroup>
2222
<PackageReference Include="System.Text.Json" />
23+
<PackageReference Include="Qdrant.Client" />
2324
</ItemGroup>
2425

2526
<ItemGroup>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using Qdrant.Client.Grpc;
4+
5+
namespace Microsoft.SemanticKernel.Connectors.Qdrant;
6+
7+
/// <summary>
8+
/// The types of mapper supported by <see cref="QdrantVectorRecordStore{TRecord}"/>.
9+
/// </summary>
10+
public enum QdrantRecordMapperType
11+
{
12+
/// <summary>
13+
/// Use the default mapper that is provided by the semantic kernel SDK that uses json as an intermediary to allows automatic mapping to a wide variety of types.
14+
/// </summary>
15+
Default,
16+
17+
/// <summary>
18+
/// Use a custom mapper between <see cref="PointStruct"/> and the data model.
19+
/// </summary>
20+
QdrantPointStructCustomMapper
21+
}

0 commit comments

Comments
 (0)