Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumentation support for dataloader #175

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,65 @@ When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as f
* If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future
* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`

## Instrumenting the data loader code

A `DataLoader` can have a `DataLoaderInstrumentation` associated with it. This callback interface is intended to provide
insight into working of the `DataLoader` such as how long it takes to run or to allow for logging of key events.

You set the `DataLoaderInstrumentation` into the `DataLoaderOptions` at build time.

```java


DataLoaderInstrumentation timingInstrumentation = new DataLoaderInstrumentation() {
@Override
public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
long then = System.currentTimeMillis();
return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
long ms = System.currentTimeMillis() - then;
System.out.println(format("dispatch time: %d ms", ms));
});
}

@Override
public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
long then = System.currentTimeMillis();
return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
long ms = System.currentTimeMillis() - then;
System.out.println(format("batch loader time: %d ms", ms));
});
}
};
DataLoaderOptions options = DataLoaderOptions.newOptions().setInstrumentation(timingInstrumentation);
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);

```

The example shows how long the overall `DataLoader` dispatch takes or how long the batch loader takes to run.

### Instrumenting the DataLoaderRegistry

You can also associate a `DataLoaderInstrumentation` with a `DataLoaderRegistry`. Every `DataLoader` registered will be changed so that the registry
`DataLoaderInstrumentation` is associated with it. This allows you to set just the one `DataLoaderInstrumentation` in place and it applies to all
data loaders.

```java
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader);
DataLoader<String, User> teamsDataLoader = DataLoaderFactory.newDataLoader(teamsBatchLoader);

DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
.instrumentation(timingInstrumentation)
.register("users", userDataLoader)
.register("teams", teamsDataLoader)
.build();

DataLoader<String, User> changedUsersDataLoader = registry.getDataLoader("users");
```

The `timingInstrumentation` here will be associated with the `DataLoader` under the key `users` and the key `teams`. Note that since
DataLoader is immutable, a new changed object is created so you must use the registry to get the `DataLoader`.


## Other information sources

- [Facebook DataLoader Github repo](https://github.com/facebook/dataloader)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/DataLoaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public Builder<K, V> options(DataLoaderOptions options) {
return this;
}

DataLoader<K, V> build() {
public DataLoader<K, V> build() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed in previous PR

return mkDataLoader(batchLoadFunction, options);
}
}
Expand Down
38 changes: 32 additions & 6 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.dataloader.annotations.GuardedBy;
import org.dataloader.annotations.Internal;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.dataloader.instrumentation.DataLoaderInstrumentationContext;
import org.dataloader.reactive.ReactiveSupport;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.StatisticsCollector;
Expand Down Expand Up @@ -34,6 +36,7 @@
import static java.util.stream.Collectors.toList;
import static org.dataloader.impl.Assertions.assertState;
import static org.dataloader.impl.Assertions.nonNull;
import static org.dataloader.instrumentation.DataLoaderInstrumentationHelper.ctxOrNoopCtx;

/**
* This helps break up the large DataLoader class functionality, and it contains the logic to dispatch the
Expand Down Expand Up @@ -167,6 +170,8 @@ Object getCacheKeyWithContext(K key, Object context) {
}

DispatchResult<V> dispatch() {
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));

boolean batchingEnabled = loaderOptions.batchingEnabled();
final List<K> keys;
final List<Object> callContexts;
Expand All @@ -175,7 +180,8 @@ DispatchResult<V> dispatch() {
int queueSize = loaderQueue.size();
if (queueSize == 0) {
lastDispatchTime.set(now());
return emptyDispatchResult();
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, emptyDispatchResult());
}

// we copy the pre-loaded set of futures ready for dispatch
Expand All @@ -192,7 +198,8 @@ DispatchResult<V> dispatch() {
lastDispatchTime.set(now());
}
if (!batchingEnabled) {
return emptyDispatchResult();
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, emptyDispatchResult());
}
final int totalEntriesHandled = keys.size();
//
Expand All @@ -213,7 +220,15 @@ DispatchResult<V> dispatch() {
} else {
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
}
return new DispatchResult<>(futureList, totalEntriesHandled);
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, new DispatchResult<>(futureList, totalEntriesHandled));
}

private DispatchResult<V> endDispatchCtx(DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx, DispatchResult<V> dispatchResult) {
// once the CF completes, we can tell the instrumentation
dispatchResult.getPromisedResults()
.whenComplete((result, throwable) -> instrCtx.onCompleted(dispatchResult, throwable));
return dispatchResult;
}

private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
Expand Down Expand Up @@ -427,11 +442,14 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
}

CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures) {
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
.context(context).keyContexts(keys, keyContexts).build();

DataLoaderInstrumentationContext<List<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginBatchLoader(dataLoader, keys, environment));

CompletableFuture<List<V>> batchLoad;
try {
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
.context(context).keyContexts(keys, keyContexts).build();
if (isMapLoader()) {
batchLoad = invokeMapBatchLoader(keys, environment);
} else if (isPublisher()) {
Expand All @@ -441,12 +459,16 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
} else {
batchLoad = invokeListBatchLoader(keys, environment);
}
instrCtx.onDispatched();
} catch (Exception e) {
instrCtx.onDispatched();
batchLoad = CompletableFutureKit.failedFuture(e);
}
batchLoad.whenComplete(instrCtx::onCompleted);
return batchLoad;
}


@SuppressWarnings("unchecked")
private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
CompletionStage<List<V>> loadResult;
Expand Down Expand Up @@ -575,6 +597,10 @@ private boolean isMappedPublisher() {
return batchLoadFunction instanceof MappedBatchPublisher;
}

private DataLoaderInstrumentation instrumentation() {
return loaderOptions.getInstrumentation();
}

int dispatchDepth() {
synchronized (dataLoader) {
return loaderQueue.size();
Expand Down
38 changes: 25 additions & 13 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.dataloader.annotations.PublicApi;
import org.dataloader.impl.Assertions;
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.NoOpStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;
Expand Down Expand Up @@ -48,6 +50,7 @@ public class DataLoaderOptions {
private BatchLoaderContextProvider environmentProvider;
private ValueCacheOptions valueCacheOptions;
private BatchLoaderScheduler batchLoaderScheduler;
private DataLoaderInstrumentation instrumentation;

/**
* Creates a new data loader options with default settings.
Expand All @@ -61,6 +64,7 @@ public DataLoaderOptions() {
environmentProvider = NULL_PROVIDER;
valueCacheOptions = ValueCacheOptions.newOptions();
batchLoaderScheduler = null;
instrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION;
}

/**
Expand All @@ -80,7 +84,8 @@ public DataLoaderOptions(DataLoaderOptions other) {
this.statisticsCollector = other.statisticsCollector;
this.environmentProvider = other.environmentProvider;
this.valueCacheOptions = other.valueCacheOptions;
batchLoaderScheduler = other.batchLoaderScheduler;
this.batchLoaderScheduler = other.batchLoaderScheduler;
this.instrumentation = other.instrumentation;
}

/**
Expand All @@ -103,7 +108,6 @@ public boolean batchingEnabled() {
* Sets the option that determines whether batch loading is enabled.
*
* @param batchingEnabled {@code true} to enable batch loading, {@code false} otherwise
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setBatchingEnabled(boolean batchingEnabled) {
Expand All @@ -124,7 +128,6 @@ public boolean cachingEnabled() {
* Sets the option that determines whether caching is enabled.
*
* @param cachingEnabled {@code true} to enable caching, {@code false} otherwise
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setCachingEnabled(boolean cachingEnabled) {
Expand All @@ -134,7 +137,7 @@ public DataLoaderOptions setCachingEnabled(boolean cachingEnabled) {

/**
* Option that determines whether to cache exceptional values (the default), or not.
*
* <p>
* For short-lived caches (that is request caches) it makes sense to cache exceptions since
* it's likely the key is still poisoned. However, if you have long-lived caches, then it may make
* sense to set this to false since the downstream system may have recovered from its failure
Expand All @@ -150,7 +153,6 @@ public boolean cachingExceptionsEnabled() {
* Sets the option that determines whether exceptional values are cache enabled.
*
* @param cachingExceptionsEnabled {@code true} to enable caching exceptional values, {@code false} otherwise
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) {
Expand All @@ -173,7 +175,6 @@ public Optional<CacheKey> cacheKeyFunction() {
* Sets the function to use for creating the cache key, if caching is enabled.
*
* @param cacheKeyFunction the cache key function to use
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
Expand All @@ -196,7 +197,6 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
* Sets the cache map implementation to use for caching, if caching is enabled.
*
* @param cacheMap the cache map instance
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setCacheMap(CacheMap<?, ?> cacheMap) {
Expand All @@ -219,7 +219,6 @@ public int maxBatchSize() {
* before they are split into multiple class
*
* @param maxBatchSize the maximum batch size
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
Expand All @@ -240,7 +239,6 @@ public StatisticsCollector getStatisticsCollector() {
* a common value
*
* @param statisticsCollector the statistics collector to use
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setStatisticsCollector(Supplier<StatisticsCollector> statisticsCollector) {
Expand All @@ -259,7 +257,6 @@ public BatchLoaderContextProvider getBatchLoaderContextProvider() {
* Sets the batch loader environment provider that will be used to give context to batch load functions
*
* @param contextProvider the batch loader context provider
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvider contextProvider) {
Expand All @@ -282,7 +279,6 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide
* Sets the value cache implementation to use for caching values, if caching is enabled.
*
* @param valueCache the value cache instance
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
Expand All @@ -301,7 +297,6 @@ public ValueCacheOptions getValueCacheOptions() {
* Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
*
* @param valueCacheOptions the value cache options
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) {
Expand All @@ -321,11 +316,28 @@ public BatchLoaderScheduler getBatchLoaderScheduler() {
* to some future time.
*
* @param batchLoaderScheduler the scheduler
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) {
this.batchLoaderScheduler = batchLoaderScheduler;
return this;
}

/**
* @return the {@link DataLoaderInstrumentation} to use
*/
public DataLoaderInstrumentation getInstrumentation() {
return instrumentation;
}

/**
* Sets in a new {@link DataLoaderInstrumentation}
*
* @param instrumentation the new {@link DataLoaderInstrumentation}
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setInstrumentation(DataLoaderInstrumentation instrumentation) {
this.instrumentation = nonNull(instrumentation);
return this;
}
}
Loading
Loading