Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify the way we set custom header in triple protocol #1138

Merged
merged 1 commit into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ public class ConsumerCustomHeaderFilter extends Filter {

@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
setCustomHeader(request);
return invoker.invoke(request);
}

private void setCustomHeader(SofaRequest sofaRequest) {
RpcInvokeContext context = RpcInvokeContext.getContext();
Map customHeader = context.getCustomHeader();
if (CommonUtils.isNotEmpty(customHeader)) {
sofaRequest.addRequestProps(customHeader);
try {
Map customHeader = context.getCustomHeader();
if (CommonUtils.isNotEmpty(customHeader)) {
request.addRequestProps(customHeader);
}
return invoker.invoke(request);
} finally {
context.clearCustomHeader();
}
context.clearCustomHeader();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -42,22 +43,52 @@ public void testCustomFilter() {
Assert.assertEquals("b", request.getRequestProp("a"));
}

@Test
public void testClearAfterInvoke() {
RpcInvokeContext.getContext().addCustomHeader("a", "b");
ConsumerCustomHeaderFilter filter = new ConsumerCustomHeaderFilter();
SofaRequest request = new SofaRequest();
RecordInvoker invoker = new RecordInvoker(null);
filter.invoke(invoker, request);
Assert.assertTrue(RpcInvokeContext.getContext().getCustomHeader().isEmpty());
Assert.assertEquals("b", invoker.getMyHeader().get("a"));
}

static class EmptyInvoker extends FilterInvoker {

private Map<String, String> metaHolder;

protected EmptyInvoker(AbstractInterfaceConfig config) {
super(config);
}

public Map<String, String> getMetaHolder() {
return metaHolder;
}

private Map<String, String> metaHolder;
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
return null;
}
}

protected EmptyInvoker(AbstractInterfaceConfig config) {
private class RecordInvoker extends FilterInvoker {

private Map<String, String> myHeader;

protected RecordInvoker(AbstractInterfaceConfig config) {
super(config);
}

public Map<String, String> getMyHeader() {
return myHeader;
}

@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
Map<String, String> customHeader = RpcInvokeContext.getContext().getCustomHeader();
myHeader = new HashMap<>(customHeader);
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @author zhaowang
* @version : MetadataHolder.java, v 0.1 2020年09月09日 4:09 下午 zhaowang Exp $
*/
@Deprecated
public class MetadataHolder {
static final ThreadLocal<Map<String,String>> localContext = new ThreadLocal<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;
import com.alipay.sofa.rpc.common.MetadataHolder;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.tracer.sofatracer.TracingContextKey;
import io.grpc.Metadata;

import java.util.Map;

import static com.alipay.sofa.rpc.server.triple.TripleHeadKeys.HEAD_KEY_TRAFFIC_TYPE;

/**
Expand All @@ -51,8 +49,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So
SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
boolean loadTest = TracerUtils.isLoadTest(currentSpan);
if (loadTest) {
Map<String, String> metaHolder = MetadataHolder.getMetaHolder();
metaHolder.put(HEAD_KEY_TRAFFIC_TYPE.name(), PRESSURE);
RpcInvokeContext.getContext().addCustomHeader(HEAD_KEY_TRAFFIC_TYPE.name(), PRESSURE);
}

// provider side ,if in consumer side, metadata == null
Expand All @@ -63,10 +60,6 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So
currentSpan.getSofaTracerSpanContext().setBizBaggageItem(MARK, T);
}
}
try {
return invoker.invoke(request);
} finally {
MetadataHolder.clear();
}
return invoker.invoke(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.alipay.common.tracer.core.context.trace.SofaTraceContext;
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.sofa.rpc.common.MetadataHolder;
import com.alipay.sofa.rpc.common.RemotingConstants;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.TracerCompatibleConstants;
Expand Down Expand Up @@ -142,12 +141,18 @@ public static void beforeSend(SofaRequest sofaRequest, ConsumerConfig consumerCo
}

// set custom headers
Set<Map.Entry<String, String>> entries = MetadataHolder.getMetaHolder().entrySet();
for (Map.Entry<String, String> entry : entries) {
if (StringUtils.isNotBlank(entry.getValue())) {
requestHeader.put(TripleHeadKeys.getKey(entry.getKey()), entry.getValue());
try{
Set<Map.Entry<String, String>> customHeader = RpcInvokeContext.getContext().getCustomHeader().entrySet();
for (Map.Entry<String, String> entry : customHeader) {
if (StringUtils.isNotBlank(entry.getValue())) {
requestHeader.put(TripleHeadKeys.getKey(entry.getKey()), entry.getValue());
}
}
}finally {
RpcInvokeContext.getContext().clearCustomHeader();
}


}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;
import com.alipay.sofa.rpc.common.MetadataHolder;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
Expand All @@ -33,6 +33,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static com.alipay.sofa.rpc.filter.PressureMarkTransformFilter.PRESSURE;
Expand All @@ -55,6 +56,9 @@ public void before() {

@After
public void after() {
invoker.getMetaHolder().clear();
// mock ConsumerCustomHeaderFilter
RpcInvokeContext.getContext().clearCustomHeader();
SofaTraceContextHolder.getSofaTraceContext().clear();
}

Expand Down Expand Up @@ -124,7 +128,7 @@ protected EmptyInvoker(AbstractInterfaceConfig config) {

@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
this.metaHolder = MetadataHolder.getMetaHolder();
this.metaHolder = new HashMap<>(RpcInvokeContext.getContext().getCustomHeader());
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public interface TripleHessianInterface {

Response call2(Request request);

boolean testPressureMark(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.alipay.sofa.rpc.triple;

import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;

/**
* @author zhaowang
* @version : TripleHessianInterfaceImpl.java, v 0.1 2020年06月11日 11:29 上午 zhaowang Exp $
Expand Down Expand Up @@ -53,4 +57,10 @@ public Response call2(Request request) {
public String getFlag() {
return flag;
}

@Override
public boolean testPressureMark(String name) {
SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
return TracerUtils.isLoadTest(currentSpan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.alipay.sofa.rpc.triple;

import com.alipay.common.tracer.core.SofaTracer;
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
Expand All @@ -27,15 +31,25 @@
import com.alipay.sofa.rpc.log.LoggerFactory;
import org.apache.commons.lang.math.RandomUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Map;

/**
* @author zhaowang
* @version : TripleHessianInvokeTest.java, v 0.1 2020年06月11日 11:16 上午 zhaowang Exp $
*/
public class TripleHessianInvokeTest {

private static final Logger LOGGER = LoggerFactory.getLogger(TripleHessianInvokeTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TripleHessianInvokeTest.class);
public static final SofaTracer tracer = new SofaTracer.Builder("TEST").build();

@Before
public void before() {
SofaTracerSpan span = (SofaTracerSpan) tracer.buildSpan("test").start();
SofaTraceContextHolder.getSofaTraceContext().push(span);
}

@Test
public void testInvoke() throws InterruptedException {
Expand Down Expand Up @@ -71,11 +85,21 @@ public void testInvoke() throws InterruptedException {

TripleHessianInterface helloService = consumerConfig.refer();

Thread.sleep(10 * 1000);
LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName());
helloService.call();
Assert.assertEquals("call", ref.getFlag());

// test Pressure Mark
boolean isLoadTest = helloService.testPressureMark("name");
Assert.assertFalse(isLoadTest);

SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
Map<String, String> bizBaggage = currentSpan.getSofaTracerSpanContext().getBizBaggage();
bizBaggage.put("mark", "T");
Assert.assertTrue(TracerUtils.isLoadTest(currentSpan));
isLoadTest = helloService.testPressureMark("name");
Assert.assertTrue(isLoadTest);

String s = helloService.call1();
Assert.assertEquals("call1", ref.getFlag());
Assert.assertEquals("call1", s);
Expand Down Expand Up @@ -136,7 +160,6 @@ public void testInvokeWithUniqueId() throws InterruptedException {

TripleHessianInterface helloService = consumerConfig.refer();

Thread.sleep(10 * 1000);
LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName());
helloService.call();
Assert.assertEquals("call", ref.getFlag());
Expand Down Expand Up @@ -167,7 +190,6 @@ public void testInvokeWithUniqueId() throws InterruptedException {

helloService = consumerConfig.refer();

Thread.sleep(10 * 1000);
LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName());
helloService.call();
Assert.assertEquals("call", ref.getFlag());
Expand Down