Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumentation support for dataloader #175

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,65 @@ When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as f
* If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future
* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`

## Instrumenting the data loader code

A `DataLoader` can have a `DataLoaderInstrumentation` associated with it. This callback interface is intended to provide
insight into working of the `DataLoader` such as how long it takes to run or to allow for logging of key events.

You set the `DataLoaderInstrumentation` into the `DataLoaderOptions` at build time.

```java


DataLoaderInstrumentation timingInstrumentation = new DataLoaderInstrumentation() {
@Override
public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
long then = System.currentTimeMillis();
return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
long ms = System.currentTimeMillis() - then;
System.out.println(format("dispatch time: %d ms", ms));
});
}

@Override
public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
long then = System.currentTimeMillis();
return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
long ms = System.currentTimeMillis() - then;
System.out.println(format("batch loader time: %d ms", ms));
});
}
};
DataLoaderOptions options = DataLoaderOptions.newOptions().setInstrumentation(timingInstrumentation);
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);

```

The example shows how long the overall `DataLoader` dispatch takes or how long the batch loader takes to run.

### Instrumenting the DataLoaderRegistry

You can also associate a `DataLoaderInstrumentation` with a `DataLoaderRegistry`. Every `DataLoader` registered will be changed so that the registry
`DataLoaderInstrumentation` is associated with it. This allows you to set just the one `DataLoaderInstrumentation` in place and it applies to all
data loaders.

```java
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader);
DataLoader<String, User> teamsDataLoader = DataLoaderFactory.newDataLoader(teamsBatchLoader);

DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
.instrumentation(timingInstrumentation)
.register("users", userDataLoader)
.register("teams", teamsDataLoader)
.build();

DataLoader<String, User> changedUsersDataLoader = registry.getDataLoader("users");
```

The `timingInstrumentation` here will be associated with the `DataLoader` under the key `users` and the key `teams`. Note that since
DataLoader is immutable, a new changed object is created so you must use the registry to get the `DataLoader`.


## Other information sources

- [Facebook DataLoader Github repo](https://github.com/facebook/dataloader)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/DataLoaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public Builder<K, V> options(DataLoaderOptions options) {
return this;
}

DataLoader<K, V> build() {
public DataLoader<K, V> build() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed in previous PR

return mkDataLoader(batchLoadFunction, options);
}
}
Expand Down
38 changes: 32 additions & 6 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.dataloader.annotations.GuardedBy;
import org.dataloader.annotations.Internal;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.dataloader.instrumentation.DataLoaderInstrumentationContext;
import org.dataloader.reactive.ReactiveSupport;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.StatisticsCollector;
Expand Down Expand Up @@ -34,6 +36,7 @@
import static java.util.stream.Collectors.toList;
import static org.dataloader.impl.Assertions.assertState;
import static org.dataloader.impl.Assertions.nonNull;
import static org.dataloader.instrumentation.DataLoaderInstrumentationHelper.ctxOrNoopCtx;

/**
* This helps break up the large DataLoader class functionality, and it contains the logic to dispatch the
Expand Down Expand Up @@ -167,6 +170,8 @@ Object getCacheKeyWithContext(K key, Object context) {
}

DispatchResult<V> dispatch() {
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));

boolean batchingEnabled = loaderOptions.batchingEnabled();
final List<K> keys;
final List<Object> callContexts;
Expand All @@ -175,7 +180,8 @@ DispatchResult<V> dispatch() {
int queueSize = loaderQueue.size();
if (queueSize == 0) {
lastDispatchTime.set(now());
return emptyDispatchResult();
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, emptyDispatchResult());
}

// we copy the pre-loaded set of futures ready for dispatch
Expand All @@ -192,7 +198,8 @@ DispatchResult<V> dispatch() {
lastDispatchTime.set(now());
}
if (!batchingEnabled) {
return emptyDispatchResult();
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, emptyDispatchResult());
}
final int totalEntriesHandled = keys.size();
//
Expand All @@ -213,7 +220,15 @@ DispatchResult<V> dispatch() {
} else {
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
}
return new DispatchResult<>(futureList, totalEntriesHandled);
instrCtx.onDispatched();
return endDispatchCtx(instrCtx, new DispatchResult<>(futureList, totalEntriesHandled));
}

private DispatchResult<V> endDispatchCtx(DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx, DispatchResult<V> dispatchResult) {
// once the CF completes, we can tell the instrumentation
dispatchResult.getPromisedResults()
.whenComplete((result, throwable) -> instrCtx.onCompleted(dispatchResult, throwable));
return dispatchResult;
}

private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
Expand Down Expand Up @@ -427,11 +442,14 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
}

CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures) {
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
.context(context).keyContexts(keys, keyContexts).build();

DataLoaderInstrumentationContext<List<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginBatchLoader(dataLoader, keys, environment));

CompletableFuture<List<V>> batchLoad;
try {
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
.context(context).keyContexts(keys, keyContexts).build();
if (isMapLoader()) {
batchLoad = invokeMapBatchLoader(keys, environment);
} else if (isPublisher()) {
Expand All @@ -441,12 +459,16 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
} else {
batchLoad = invokeListBatchLoader(keys, environment);
}
instrCtx.onDispatched();
} catch (Exception e) {
instrCtx.onDispatched();
batchLoad = CompletableFutureKit.failedFuture(e);
}
batchLoad.whenComplete(instrCtx::onCompleted);
return batchLoad;
}


@SuppressWarnings("unchecked")
private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
CompletionStage<List<V>> loadResult;
Expand Down Expand Up @@ -575,6 +597,10 @@ private boolean isMappedPublisher() {
return batchLoadFunction instanceof MappedBatchPublisher;
}

private DataLoaderInstrumentation instrumentation() {
return loaderOptions.getInstrumentation();
}

int dispatchDepth() {
synchronized (dataLoader) {
return loaderQueue.size();
Expand Down
54 changes: 42 additions & 12 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.dataloader;

import org.dataloader.annotations.PublicApi;
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.NoOpStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;
Expand Down Expand Up @@ -52,6 +54,7 @@ public class DataLoaderOptions {
private final BatchLoaderContextProvider environmentProvider;
private final ValueCacheOptions valueCacheOptions;
private final BatchLoaderScheduler batchLoaderScheduler;
private final DataLoaderInstrumentation instrumentation;

/**
* Creates a new data loader options with default settings.
Expand All @@ -68,6 +71,7 @@ public DataLoaderOptions() {
environmentProvider = NULL_PROVIDER;
valueCacheOptions = DEFAULT_VALUE_CACHE_OPTIONS;
batchLoaderScheduler = null;
instrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION;
}

private DataLoaderOptions(Builder builder) {
Expand All @@ -82,6 +86,7 @@ private DataLoaderOptions(Builder builder) {
this.environmentProvider = builder.environmentProvider;
this.valueCacheOptions = builder.valueCacheOptions;
this.batchLoaderScheduler = builder.batchLoaderScheduler;
this.instrumentation = builder.instrumentation;
}

/**
Expand All @@ -101,7 +106,8 @@ public DataLoaderOptions(DataLoaderOptions other) {
this.statisticsCollector = other.statisticsCollector;
this.environmentProvider = other.environmentProvider;
this.valueCacheOptions = other.valueCacheOptions;
batchLoaderScheduler = other.batchLoaderScheduler;
this.batchLoaderScheduler = other.batchLoaderScheduler;
this.instrumentation = other.instrumentation;
}

/**
Expand Down Expand Up @@ -169,7 +175,7 @@ public boolean batchingEnabled() {
* Sets the option that determines whether batch loading is enabled.
*
* @param batchingEnabled {@code true} to enable batch loading, {@code false} otherwise
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setBatchingEnabled(boolean batchingEnabled) {
return builder().setBatchingEnabled(batchingEnabled).build();
Expand All @@ -188,7 +194,7 @@ public boolean cachingEnabled() {
* Sets the option that determines whether caching is enabled.
*
* @param cachingEnabled {@code true} to enable caching, {@code false} otherwise
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCachingEnabled(boolean cachingEnabled) {
return builder().setCachingEnabled(cachingEnabled).build();
Expand All @@ -212,7 +218,7 @@ public boolean cachingExceptionsEnabled() {
* Sets the option that determines whether exceptional values are cache enabled.
*
* @param cachingExceptionsEnabled {@code true} to enable caching exceptional values, {@code false} otherwise
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) {
return builder().setCachingExceptionsEnabled(cachingExceptionsEnabled).build();
Expand All @@ -233,7 +239,7 @@ public Optional<CacheKey> cacheKeyFunction() {
* Sets the function to use for creating the cache key, if caching is enabled.
*
* @param cacheKeyFunction the cache key function to use
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
return builder().setCacheKeyFunction(cacheKeyFunction).build();
Expand All @@ -254,7 +260,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
* Sets the cache map implementation to use for caching, if caching is enabled.
*
* @param cacheMap the cache map instance
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setCacheMap(CacheMap<?, ?> cacheMap) {
return builder().setCacheMap(cacheMap).build();
Expand All @@ -275,7 +281,7 @@ public int maxBatchSize() {
* before they are split into multiple class
*
* @param maxBatchSize the maximum batch size
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
return builder().setMaxBatchSize(maxBatchSize).build();
Expand All @@ -294,7 +300,7 @@ public StatisticsCollector getStatisticsCollector() {
* a common value
*
* @param statisticsCollector the statistics collector to use
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setStatisticsCollector(Supplier<StatisticsCollector> statisticsCollector) {
return builder().setStatisticsCollector(nonNull(statisticsCollector)).build();
Expand All @@ -311,7 +317,7 @@ public BatchLoaderContextProvider getBatchLoaderContextProvider() {
* Sets the batch loader environment provider that will be used to give context to batch load functions
*
* @param contextProvider the batch loader context provider
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvider contextProvider) {
return builder().setBatchLoaderContextProvider(nonNull(contextProvider)).build();
Expand All @@ -332,7 +338,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide
* Sets the value cache implementation to use for caching values, if caching is enabled.
*
* @param valueCache the value cache instance
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
return builder().setValueCache(valueCache).build();
Expand All @@ -349,7 +355,7 @@ public ValueCacheOptions getValueCacheOptions() {
* Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
*
* @param valueCacheOptions the value cache options
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) {
return builder().setValueCacheOptions(nonNull(valueCacheOptions)).build();
Expand All @@ -367,12 +373,29 @@ public BatchLoaderScheduler getBatchLoaderScheduler() {
* to some future time.
*
* @param batchLoaderScheduler the scheduler
* @return the data loader options for fluent coding
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) {
return builder().setBatchLoaderScheduler(batchLoaderScheduler).build();
}

/**
* @return the {@link DataLoaderInstrumentation} to use
*/
public DataLoaderInstrumentation getInstrumentation() {
return instrumentation;
}

/**
* Sets in a new {@link DataLoaderInstrumentation}
*
* @param instrumentation the new {@link DataLoaderInstrumentation}
* @return a new data loader options instance for fluent coding
*/
public DataLoaderOptions setInstrumentation(DataLoaderInstrumentation instrumentation) {
return builder().setInstrumentation(instrumentation).build();
}

private Builder builder() {
return new Builder(this);
}
Expand All @@ -389,6 +412,7 @@ public static class Builder {
private BatchLoaderContextProvider environmentProvider;
private ValueCacheOptions valueCacheOptions;
private BatchLoaderScheduler batchLoaderScheduler;
private DataLoaderInstrumentation instrumentation;

public Builder() {
this(new DataLoaderOptions()); // use the defaults of the DataLoaderOptions for this builder
Expand All @@ -406,6 +430,7 @@ public Builder() {
this.environmentProvider = other.environmentProvider;
this.valueCacheOptions = other.valueCacheOptions;
this.batchLoaderScheduler = other.batchLoaderScheduler;
this.instrumentation = other.instrumentation;
}

public Builder setBatchingEnabled(boolean batchingEnabled) {
Expand Down Expand Up @@ -463,6 +488,11 @@ public Builder setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler
return this;
}

public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) {
this.instrumentation = nonNull(instrumentation);
return this;
}

public DataLoaderOptions build() {
return new DataLoaderOptions(this);
}
Expand Down
Loading