Skip to content

[Prototype] Distributed Tracing  #7026

Open
@Gaganjuneja

Description

@Gaganjuneja

Distributed tracing is defined in this RFC #6750. In the continuation of this, I prototyped the actual interface and implementation to see how it works? We mostly focused on the below aspects in this prototype and rest should be built on top of it.

  1. Abstractions
    Abstractions are clearly defined to hide the implementation framework like OTel so that the tracing users need not be aware about the implementation. We would be exposing the well defined simple interface to the developers and hiding the implementation. Developers can add the traces by simply adding the start trace and end trace.
  2. Code Pollution
    Code should not be polluted. If we write a lot of code for tracing then it will be hard to find/focus on the actual logic. We will be providing the methods to start/end traces and most of the plumbing work will be taken care of by the framework.
  3. Automatic Context Propagation
    A trace may be linked to a parent and one parent trace could have multiple children. Most of the tasks/operations run in async mode in the OpenSearch cluster. These async tasks may run on the same thread, different threads in the same thread pool, different thread pool and even on different nodes. In this case the parent context needs to be propagated to the child so that child trace and parent trace can be linked together to show the telemetry information either at parent or / as well as at child level. One classical example for this would be a search request.

Example - Let’s assume there is an OS cluster of 3 data nodes. It contains one Index with 3 shards distributed over 3 nodes (node1, node2, node3). Now we need to trace this request at multiple code points like, RestAction, QueryPhase, IndexSearcher and Fetch phase then the physical view of the query would somewhat look like.

Screenshot 2023-04-05 at 12 12 16 PM

Let’s say we are tracing all these places so we need to propagate the current parent to the following tasks/operations automatically as the current context might not be aware of the parent.

There are 2 ways this context propagation can be done.

  1. OpenSearch ThreadContext - Opensearch custom ThreadContext provides out of the box feature to propagate the context to forked threads on the same/different threadpool or even the network calls to the other nodes. We can simply update the parent info in the thread context and it will be available to the children. ThreadContext will persist the context of immediate parent only.

    1. Pros
      1. Out of the box support.
      2. Context propagation can totally be abstracted out in the framework itself.
    2. Cons
      1. Slight maintenance overhead in maintaining the span cleanup logic and updating the ThreadContext.
  2. Opentelemetry- Opentelemetry also provides the support for context propagation but it requires an exposure of Otel code to the user. More details on context propagation through OTel can be found here [RFC] Performance metrics framework #6533 .
    1. Pros

    1. No state needs to be maintained at Framework level so no maintenance overhead.
      2. Cons
    2. Code pollution
    3. Exposes the OTel/Telemetry implementation to core OS code so upgrades/change the implementation solution would be difficult.
    4. Hard to add circuit breakers and level management as levels are not part of Otel yet.
  3. Recommended Approach - Both approaches have their own pros and cons but fundamentally approach-1 OpenSearch ThreadContext gives us more control and clear abstractions. Though it requires some extra code maintenance overhead but considering the pros it’s recommended.

Sample Tracing Interface ## (Not prod ready code just for understanding purposes)

Levels - Level is an OpenSearch specific concept which OpenTelemetry doesn’t support yet(OpenTelemetry github issue for Levels). It will work similar to logging levels. It allows developers to add detailed spans with appropriate tracing levels. While debugging any issue they should be able to dynamically enable the granular tracing levels for sometime. There is one strong assumption we will have to make, the level of a parent can't be higher ordered than the child so that it shouldn't get into a situation where parent span is filtered out based on the level and child still exists; it will lead to a parent child linking issue and the child will be orphaned.

public enum Level{
        UNKNOWN(0), LOWEST(1), LOW(2), MID(3), HIGH(4);
        private int order;
        private Level(int order){
            this.order = order;
        }
    }
public class SpanName{
        String name;
        String uniqueId;
    }
public interface Tracer {

    
    /**
     * Start the trace with passed attributes. It takes care of automatically propagating the context/parent to the
     * child spans wherever the context switch is happening like from one thread to another in the same
     * ExecutorService, across thread pools and even across nodes.
     * 
     * SpanName should be unique for each span. SpanName consists of two parameters uniqueId and name. So the caller
     * need to provide the unique id for each span otherwise span creation will fail.
     * @param spanName
     * @param attributes
     */
    default public void startTrace(SpanName spanName, Map<String,Object> attributes, Level  level){
        startTrace(spanName, attributes, null, level);
    }

    /**
     * Start the trace with passed attributes. It takes care of propagating the context/parent automatically to the
     * child spans wherever the context switch is happening like from one thread to another in the same
     * ExecutorService, across thread pool and even across nodes.
     *
     * Caller can also explicitly set the Parent span. It may be needed in case one more level of nesting is
     * required and the cases where multiple async child tasks are being submitted from the same thread and the user
     * wants to set this child as a parent of the following runnable(s). The parentSpanName provided should be an active
     * span.
     *
     * SpanName should be unique for each span. SpanName consists of two parameters uniqueId and name. So the caller
     * need to provide the unique id for each span otherwise span creation will fail.
     *
     * Caller can also explicitly set the Parent span. It may be needed in case one more level of nesting is
     * required and the cases where multiple async child tasks are being submitted from the same thread and the user
     * wants to set this child as a parent of the following runnable. The parentSpanName provided should be an active
     * span. 
     *
     * Example - if three spans (A,B,C) are started in the same thread (before calling the endTrace). Then A will become
     * the parent of B and B will become the parent of C. In case the user want A to be the parent of both B and C then
     * they will have to tell explicitly through the parentSpanName.
     *
     * Callers need to define the level of the span. Levels are ordered and specified by an order value. Only Spans with
     * Levels with value higher and equal to the configured level will be published. Level of a child span can't be
     * higher than the parent span so that it shouldn't get into a situation where parent span is filtered out based on
     * the level and child still exists; it will lead to a parent child linking issue and the child will be orphaned.
     * @param spanName
     * @param attributes
     * @param parentSpanName
     * @param level
     */
    public void startTrace(SpanName spanName, Map<String,Object> attributes, SpanName parentSpanName, Level level);

    /**
     * Ends the scope of the trace. It is mandatory to end each span explicitly. If there
     * are multiple endTrace calls for the span then only one which reaches the first will
     * succeed and rest all will be ignored. 
     * @param spanName
     */
    public void endTrace(SpanName spanName);

    /**
     * Adds attribute to the span.
     * @param spanName
     * @param key
     * @param value
     */
    public void addAttribute(SpanName spanName, String key, Object value);

    /**
     * Adds an event to the span.
     * @param spanName
     * @param event
     */
    public void addEvent(SpanName spanName, String event);

}

Sample Implementation

Important Points of implementation.

  1. ThreadPool - We need to register the thread pool with the tracer during initialisation from Node.java. There is another way where users can pass the ThreadContext to the startTrace method but that would require the ThreadContext to be available in a lot of low level classes which may overall pollute the code. So the preference is to inject the ThreadPool while initialising the TracerFactory class.
  2. SpanMap - This implementation keeps all the Otel spans in the map locally and doesn't expose it to the customer. There are following pros/cons of this approach.
    1. Pros
      1. Abstraction - We will keep the spans locally and not expose it to the users so that actual core OS code or even plugins code will not have direct dependency on Otel. This way we can make it vendor agnostic and easily replace with the other telemetry solution if required.
      2. Span management - We can automatically close the spans which are open for a long time to avoid the memory leak. It could happen because of missing end span and errors. We will allow the TTL per span to be configured in case bigger TTLs than the default value (yet to be figured out) are required.
      3. Guard rails - With this approach we can easily implement the circuit breakers on the number of spans open at any point in time. We will be losing the out on the spans when the circuit breaker is triggering in so we will have to define the strategies based on span level, component priority, etc. (Yet to be discovered but we do have the option to circuit break,)
    2. Cons
      1. Management Overhead - We need to define the cache similar to LRU and all to store the spans so it will incur some management overhead at the framework level.
public class DefaultTracer implements Tracer {

    private final ThreadPool threadPool;
    private final Map<String, Span> spanMap = new ConcurrentHashMap<>();


    public DefaultTracer(io.opentelemetry.api.trace.Tracer openTelemetryTracer, ThreadPool threadPool) {
        this.openTelemetryTracer = openTelemetryTracer;
        this.threadPool = threadPool;
    }

    @Override
    public void startTrace(SpanName spanName, Map<String, Object> attributes, SpanName parentSpanName, Level level) {
        Level calculatedLevel = getLevel(level);
        if(!isLevelEnabled(calculatedLevel)){
            return;
        }
        Span parentSpan = null;
        if(parentSpanName != null){
            //parent span shouldn't be ended
            parentSpan = spanMap.get(getSpanMapKey(parentSpanName.uniqueId, parentSpanName.name));
            parentSpan = parentSpan == null ? getParentFromThreadContext(threadPool.getThreadContext()) : parentSpan;
        }
        Span span = openTelemetryTracer.spanBuilder(spanName.name).setParent(Context.current().with(parentSpan)).startSpan();
        setSpanLevel(span, parentSpan, calculatedLevel);
        addParentToThreadContext(threadPool.getThreadContext(), span);
        spanMap.put(getSpanMapKey(spanName.uniqueId, spanName.name), span);
        setSpanAttributes(span, parentSpan);
    }

    @Override
    public void endTrace(SpanName spanName) {
        Span span = spanMap.get(getSpanMapKey(spanName.uniqueId, spanName.name));
        if (span == null) {
            return;
        }
        String parentSpanId = getParentSpan(span);
        Span parentSpan = spanMapById.get(parentSpanId);
        span.end();
        spanMap.remove(getSpanMapKey(spanName.uniqueId, spanName.name));
        if (parentSpan != null) {
            addParentToThreadContext(threadPool.getThreadContext(), parentSpan);
        }
    }

    @Override
    public void addAttribute(SpanName spanName, String key, Object value) {
        /**
         * Adds attribute to the existing open span.
         */
    }

    @Override
    public void addEvent(SpanName spanName, String event) {
        /**
         * Adds event to the existing open span.
         */
    }

TracerFactory
TracerFactory will help in initialising the tracer objects and return the instance so that we need to pass the tracer instance everywhere in the code. Users can simply get the instance from TracerFactory and start the instrumentation.

public class TracerFactory {

    private static Tracer tracer;

    //Need to make singleton
    public static synchronized void initializeTracer(ThreadPool threadPool){
        OpenTelemetry openTelemetry = OTelMain.openTelemetry;
        io.opentelemetry.api.trace.Tracer openTelemetryTracer = openTelemetry.getTracer("instrumentation-library-name", "1.0.0");
        if(tracer == null) {
            tracer = new DefaultTracer(openTelemetryTracer, threadPool);
        }else{
            throw new IllegalStateException("Double-initialization not allowed!");
        }
    }

    public static Tracer getInstance(){
        return tracer;
    }
}

Sample tracing in the search flow

Snippets of instrumentation for the above example.

//TaskManager.java 
//Though the right place to add this trace is Inside TransportAction 
//but just added for testing purposes.

public Task register(String type, String action, TaskAwareRequest request) {
  .
  .
  TracerFactory.getInstance().startTrace(new SpanName( "Task", String.valueOf(task.getId())), 
            null, new SpanName( "Task", String.valueOf(task.getParentTaskId().getId())), Level.HIGH)
  .
  .
  .
  
  }
public Task unregister(Task task) {
   .
   .
    TracerFactory.getInstance().endTrace(new SpanName( "Task", String.valueOf(task.getId())));
   .
   .
}
//Inside SearchOperationsListener
    @Override
    public void onPreQueryPhase(SearchContext searchContext) {
        SearchOperationListener.super.onPreQueryPhase(searchContext);
        TracerFactory.getInstance().startTrace(new SpanName("onQueryPhase", 
                                                String.valueOf(searchContext.getTask().getId())), null, Level.HIGH);
        
    }
    
    @Override
    public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
        SearchOperationListener.super.onQueryPhase(searchContext, tookInNanos);
        TracerFactory.getInstance().endTrace(new SpanName("onQueryPhase", 
                                              String.valueOf(searchContext.getTask().getId())));

    }
    
     @Override
    public void onPreFetchPhase(SearchContext searchContext) {
        SearchOperationListener.super.onPreFetchPhase(searchContext);
        TracerFactory.getInstance().startTrace(new SpanName("onFetchPhase", 
                                             String.valueOf(searchContext.getTask().getId())), null, Level.HIGH);
    }
    
    @Override
    public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
        SearchOperationListener.super.onFetchPhase(searchContext, tookInNanos);
        TracerFactory.getInstance().endTrace(new SpanName("onFetchPhase", 
                                          String.valueOf(searchContext.getTask().getId())));

    }
//Inside ContextIndexSearcher.java
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
        String randomString = UUID.randomUUID();
        TracerFactory.getInstance().startTrace(new SpanName("IndexSearcher", randomString), null, Level.MID);
        
        for (LeafReaderContext ctx : leaves) {
            searchLeaf(ctx, weight, collector);
        }
        
        TracerFactory.getInstance().endTrace(new SpanName("IndexSearcher", randomString));
        
    }

Sample tracing output

Screenshot 2023-04-06 at 2 58 25 PM

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    New

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions