diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java index fd6b57ab4..15bcea6c5 100644 --- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java +++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java @@ -14,20 +14,30 @@ import static com.tencent.trpc.core.common.Constants.DEFAULT_CLIENT_REQUEST_TIMEOUT_MS; import static com.tencent.trpc.proto.http.common.HttpConstants.CONNECTION_REQUEST_TIMEOUT; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_CALLEE; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_CALLER; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_MESSAGE_TYPE; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_REQUEST_ID; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_TIMEOUT; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_TRANS_INFO; import autovalue.shaded.com.google.common.common.base.Objects; +import com.fasterxml.jackson.core.Base64Variants; import com.tencent.trpc.core.common.config.BackendConfig; import com.tencent.trpc.core.common.config.ConsumerConfig; import com.tencent.trpc.core.common.config.ProtocolConfig; import com.tencent.trpc.core.exception.TRpcException; import com.tencent.trpc.core.logger.Logger; import com.tencent.trpc.core.logger.LoggerFactory; +import com.tencent.trpc.core.rpc.CallInfo; import com.tencent.trpc.core.rpc.Request; import com.tencent.trpc.core.rpc.Response; +import com.tencent.trpc.core.utils.JsonUtils; import com.tencent.trpc.core.utils.RpcUtils; import com.tencent.trpc.proto.http.common.HttpConstants; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -199,12 +209,7 @@ private SimpleHttpRequest buildRequest(Request request, int requestTimeout) thro SimpleHttpRequest simpleHttpRequest = SimpleHttpRequests.post(getUri(request)); simpleHttpRequest.setConfig(requestConfig); simpleHttpRequest.setHeader(HttpHeaders.CONTENT_TYPE, HttpConstants.CONTENT_TYPE_JSON); - // encode request - String jsonString = encodeToJson(request); - if (jsonString != null) { - simpleHttpRequest.setBody(jsonString, ContentType.APPLICATION_JSON); - } - // Set custom business headers, consistent with the TRPC protocol, only process String and byte[] + // set custom business headers, consistent with the TRPC protocol, only process String and byte[] request.getAttachments().forEach((k, v) -> { if (Objects.equal(k, HttpHeaders.TRANSFER_ENCODING) || Objects.equal(k, HttpHeaders.CONTENT_LENGTH)) { return; @@ -215,6 +220,43 @@ private SimpleHttpRequest buildRequest(Request request, int requestTimeout) thro simpleHttpRequest.setHeader(k, new String((byte[]) v)); } }); + // set caller and callee information + CallInfo callInfo = request.getMeta().getCallInfo(); + simpleHttpRequest.setHeader(HTTP_HEADER_TRPC_CALLER, callInfo.getCaller()); + simpleHttpRequest.setHeader(HTTP_HEADER_TRPC_CALLEE, callInfo.getCallee()); + + // set request id + simpleHttpRequest.setHeader(HTTP_HEADER_TRPC_REQUEST_ID, String.valueOf(request.getRequestId())); + // set timeout + simpleHttpRequest.setHeader(HTTP_HEADER_TRPC_TIMEOUT, String.valueOf(requestTimeout)); + // set message type + int messageType = request.getMeta().getMessageType(); + if (messageType != 0) { + simpleHttpRequest.setHeader(HTTP_HEADER_TRPC_MESSAGE_TYPE, String.valueOf(messageType)); + } + // set trans info: encode attachments as JSON with base64-encoded values, + // consistent with the server-side decoding logic in AbstractHttpExecutor.setAttachments + Map attachments = request.getAttachments(); + if (attachments != null && !attachments.isEmpty()) { + Map transInfoMap = new LinkedHashMap<>(); + attachments.forEach((k, v) -> { + if (v instanceof byte[]) { + transInfoMap.put(k, Base64Variants.getDefaultVariant().encode((byte[]) v)); + } else if (v instanceof String) { + transInfoMap.put(k, Base64Variants.getDefaultVariant() + .encode(((String) v).getBytes(StandardCharsets.UTF_8))); + } + }); + if (!transInfoMap.isEmpty()) { + simpleHttpRequest.setHeader(HTTP_HEADER_TRPC_TRANS_INFO, JsonUtils.toJson(transInfoMap)); + } + } + + // encode request body + String jsonString = encodeToJson(request); + if (jsonString != null) { + simpleHttpRequest.setBody(jsonString, ContentType.APPLICATION_JSON); + } return simpleHttpRequest; } diff --git a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java index 7a022aea0..681fae96f 100644 --- a/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java +++ b/trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java @@ -15,7 +15,13 @@ import static com.tencent.trpc.proto.http.common.HttpConstants.CONNECTION_REQUEST_TIMEOUT; import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_CALLEE; import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_CALLER; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_MESSAGE_TYPE; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_REQUEST_ID; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_TIMEOUT; +import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_HEADER_TRPC_TRANS_INFO; +import com.fasterxml.jackson.core.Base64Variant; +import com.fasterxml.jackson.core.Base64Variants; import com.tencent.trpc.core.common.config.BackendConfig; import com.tencent.trpc.core.common.config.ConsumerConfig; import com.tencent.trpc.core.common.config.ProtocolConfig; @@ -23,11 +29,13 @@ import com.tencent.trpc.core.rpc.CallInfo; import com.tencent.trpc.core.rpc.Request; import com.tencent.trpc.core.rpc.Response; +import com.tencent.trpc.core.utils.JsonUtils; import com.tencent.trpc.core.utils.RpcUtils; import com.tencent.trpc.proto.http.common.HttpConstants; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import org.apache.commons.io.IOUtils; @@ -162,6 +170,33 @@ private HttpPost buildRequest(Request request) throws Exception { httpPost.setHeader(HTTP_HEADER_TRPC_CALLER, callInfo.getCaller()); httpPost.setHeader(HTTP_HEADER_TRPC_CALLEE, callInfo.getCallee()); + // set request id + httpPost.setHeader(HTTP_HEADER_TRPC_REQUEST_ID, String.valueOf(request.getRequestId())); + // set timeout + httpPost.setHeader(HTTP_HEADER_TRPC_TIMEOUT, String.valueOf(socketTimeout)); + // set message type + int messageType = request.getMeta().getMessageType(); + if (messageType != 0) { + httpPost.setHeader(HTTP_HEADER_TRPC_MESSAGE_TYPE, String.valueOf(messageType)); + } + // set trans info: encode attachments as JSON with base64-encoded values, + // consistent with the server-side decoding logic in AbstractHttpExecutor.setAttachments + Map attachments = request.getAttachments(); + if (attachments != null && !attachments.isEmpty()) { + Map transInfoMap = new LinkedHashMap<>(); + attachments.forEach((k, v) -> { + Base64Variant variant = Base64Variants.getDefaultVariant(); + if (v instanceof byte[]) { + transInfoMap.put(k, variant.encode((byte[]) v)); + } else if (v instanceof String) { + transInfoMap.put(k, variant.encode(((String) v).getBytes(StandardCharsets.UTF_8))); + } + }); + if (!transInfoMap.isEmpty()) { + httpPost.setHeader(HTTP_HEADER_TRPC_TRANS_INFO, JsonUtils.toJson(transInfoMap)); + } + } + // encode request parameters String jsonString = encodeToJson(request); if (jsonString != null) {