본문 바로가기

Programing/OpenSource

[slf4j] MDC에 put만 계속한다면

MDC를 이용해서 Correlation ID 추적에 사용을 하고 있다.

스프링을 사용한다면 AsyncHandlerInterceptor 인터페이스를 구현한 HandlerInterceptorAdapter 를 상속받으면 preHandle 과 afterCompletion 메서드에 MDC에 값을 넣고 지우도록 할 수 있다.

import java.util.UUID;
import org.slf4j.MDC;

public class MDCInterceptor extends HandlerInterceptorAdapter {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        MDC.put(ApiMDCKeys.TRACK_ID, UUID.randomUUID().toString());
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
        MDC.clear();
    }

Internal on MDC

org.slf4j.MDC 클래스이고 다양한 MDC 구현체와의 연결을 위해 내부에 MDCAdapter 인터페이스를 두고 있다.

package org.slf4j;

public class MDC {
    static MDCAdapter mdcAdapter;

MDCAdapter 인터페이스는 넣고 가져오고 지우고 등의 기능이 있다.

package org.slf4j.spi;

public interface MDCAdapter {
    public void put(String key, String val);
    public String get(String key);
    public void remove(String key);
    public void clear();
    public Map<String, String> getCopyOfContextMap();
    public void setContextMap(Map<String, String> contextMap);
}

보통 구현체에 대한 설명을 따로 하지 않않다면 LogbackMDCAdapter 가 사용된다.

package ch.qos.logback.classic.util;

public class LogbackMDCAdapter implements MDCAdapter {

    final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<Map<String, String>>();
    
    public void put(String key, String val) throws IllegalArgumentException {
        if (key == null) {
            throw new IllegalArgumentException("key cannot be null");
        }

        Map<String, String> oldMap = copyOnThreadLocal.get();
        Integer lastOp = getAndSetLastOperation(WRITE_OPERATION);

        if (wasLastOpReadOrNull(lastOp) || oldMap == null) {
            Map<String, String> newMap = duplicateAndInsertNewMap(oldMap);
            newMap.put(key, val);
        } else {
            oldMap.put(key, val);
        }
    }

    private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
        Map<String, String> newMap = Collections.synchronizedMap(new HashMap<String, String>());
        if (oldMap != null) {
            // we don't want the parent thread modifying oldMap while we are
            // iterating over it
            synchronized (oldMap) {
                newMap.putAll(oldMap);
            }
        }

        copyOnThreadLocal.set(newMap);
        return newMap;
    }

    public void remove(String key) {
        if (key == null) {
            return;
        }
        Map<String, String> oldMap = copyOnThreadLocal.get();
        if (oldMap == null)
            return;

        Integer lastOp = getAndSetLastOperation(WRITE_OPERATION);

        if (wasLastOpReadOrNull(lastOp)) {
            Map<String, String> newMap = duplicateAndInsertNewMap(oldMap);
            newMap.remove(key);
        } else {
            oldMap.remove(key);
        }
    }
    
    public void clear() {
        lastOperation.set(WRITE_OPERATION);
        copyOnThreadLocal.remove();
    }

자바의 ThreadLocal 을 이용해서 스레드당 데이터가 섞이는 것을 방지하고 있다.

내부 메서드인 duplicateAndInsertNewMap에서 SynchronizedMap 을 만들어서 돌려주는 것을 알고 있다.

코드리뷰

오늘 코드리뷰를 하다가 *혁님이 Kafka에 MDC를 추가해놓은 것을 보게 되었다.

스프링 카프카에서는 RecordInterceptor 인터페이스를 통해 레코드를 consume하기 전에 skip을 할지 선택할 수 있는 틈(stem)이 있다. (null을 반환하면 skip 하게된다.)

package org.springframework.kafka.listener;

@FunctionalInterface
public interface RecordInterceptor<K, V> {
	@Nullable
	ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
}

MDC를 생성하는 코드는 아래와 같았다.

@Configuration
public class ConsumerRecordInterceptor implements RecordInterceptor<String, Object> {
    @Override
    public ConsumerRecord<String, Object> intercept(ConsumerRecord<String, Object> record) {
        MDC.put("trackId", UUID.randomUUID().toString());
        return record;
    }
}

들었던 의문점은 MDC를 put하는 곳이 있는데 remove나 clear를 하는 곳이 없었다는 것이다.

그말은 애플리케이션이 계속 수행되면 데이터가 계속 쌓이기만 한다는 것일까?

ThreadPool

생각해보니 Thread는 매번 생성하지 않고 ThreadPool에서 돌려가며 사용을 하고 있다.

해당 스레드에서 진입시에 덮어쓴다면 특별한 문제는 없어보인다.