SpringCloud Gateway 网关请求中body、query、header参数的获取和修改
作者:mmseoamin日期:2024-03-20

目录

    • 前言
    • 一、请求头参数获取和修改
    • 二、Query参数获取和修改
    • 三、Body参数获取和修改
      • 3.1、全局body缓存过滤器
      • 3.2、获取和修改body工具类

        前言

              最近在开发中要改造一个普通SpringBoot接口服务为SpringCloud Gateway网关服务,并且需要在网关做验签,由于我们这个服务需要对外几个第三方平台提供接口,每家请求的传参形式都不同,有将签名信息放请求头、也有将签名信息放query参数、还有直接放body中的,请求头和query参数的获取和修改都比较简单,body参数的获取和修改稍微复杂一点,本文会对SpringCloud Gateway常见几种传参数据的获取和修改做讲解。

        一、请求头参数获取和修改

        @Slf4j
        @Component
        public class HeaderGlobalFilter implements GlobalFilter , Ordered {
            @Override
            public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                ServerHttpRequest request = exchange.getRequest();
                // 获取请求头中的sign,因为请求头可以有多个一样的key,可能会获取到多个值所以这里返回的是List
                List signList = request.getHeaders().get("sign");
                log.info("请求头中的signList:{}", signList);
                // 获取请求头中的sign,取第一个
                String signFirst = request.getHeaders().getFirst("sign");
                log.info("请求头中的signFirst:{}", signFirst);
                // 请求转发处理,添加新的请求头
                // 因为是使用的原始的请求对象添加了新的请求头,所以下游服务能获取到请求原有请求头和新添加的请求头
                ServerHttpRequest mutableReq = request.mutate()
                        .header("new-header", "自定义请求头")
                        .build();
                ServerWebExchange mutableExchange = exchange.mutate().request(mutableReq).build();
                return chain.filter(mutableExchange);
            }
            @Override
            public int getOrder() {
                return -2000;
            }
        }
        

        二、Query参数获取和修改

        @Slf4j
        @Component
        public class QueryParamGlobalFilter implements GlobalFilter , Ordered {
            @Override
            public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                ServerHttpRequest request = exchange.getRequest();
                // 获取uri对象
                URI uri = request.getURI();
                log.info("请求的uri = {}",uri);
                String path = request.getURI().getPath();
                log.info("请求的path = {}",path);
                String query = request.getURI().getQuery();
                log.info("请求的query = {}",query);
                // 获取请求参数转换成键值对,值是一个数组
                MultiValueMap queryParams = request.getQueryParams();
                log.info("获取请求参数转换成键值对 queryParams = {}",queryParams);
                // 将query参数转换成键值对,值是一个字符串,hutool包的HttpUtil
                Map queryParams1 = HttpUtil.decodeParamMap(query, StandardCharsets.UTF_8);
                log.info("将query参数转换成键值对 queryParams = {}",queryParams1);
                // 处理uri,通过?分割,下标第0位是请求地址,第1位置是请求参数
                String url = uri.toString();
                String[] urlSplit = url.split("\\?");
                // 拼接新的url 这里自己处理即可,我这里直接替换请求参数
                String newUrl = urlSplit[0] + "?" + "name=kerwinNew&age=17";
                URI newUri = null;
                try {
                    //将编码后的 %23 替换为 ,重新用这个字符串生成 URI
                    newUri = new URI(newUrl.replace("%23", ""));
                } catch (URISyntaxException e) {
                    throw new RuntimeException(e);
                }
                ServerHttpRequest mutableReq = request.mutate()
                        .uri(newUri)
                        .build();
                ServerWebExchange mutableExchange = exchange.mutate().request(mutableReq).build();
                return chain.filter(mutableExchange);
            }
            @Override
            public int getOrder() {
                return -1000;
            }
        }
        

        三、Body参数获取和修改

        在Gateway中通常会有一个过滤器链,而 request body 只能读取一次,也就是说,如果在过滤器A中已经读取一次,在后面的过滤器B是无法读取成功的,会抛出如下的报错

        java.lang.IllegalStateException: Only one connection receive subscriber allowed.
        	at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:279)
        	at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe(FluxReceive.java:129)
        	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
        	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
        	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
        	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
        	at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:884)
        	at java.lang.Thread.run(Thread.java:745)
        

        要解决这个问题需要将获取body的方法重写,进行缓存读即可,我们可以添加一个自定义CacheBodyGlobalFilter全局过滤器来进行处理,要注意的是这个过滤器优先级一定要是最高的,不然别的过滤器读取一次body数据后这里就读取不到了。

        3.1、全局body缓存过滤器

        import lombok.extern.slf4j.Slf4j;
        import org.springframework.cloud.gateway.filter.GatewayFilterChain;
        import org.springframework.cloud.gateway.filter.GlobalFilter;
        import org.springframework.core.Ordered;
        import org.springframework.core.io.buffer.DataBuffer;
        import org.springframework.core.io.buffer.DataBufferUtils;
        import org.springframework.http.HttpHeaders;
        import org.springframework.http.MediaType;
        import org.springframework.http.codec.HttpMessageReader;
        import org.springframework.http.server.reactive.ServerHttpRequest;
        import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
        import org.springframework.stereotype.Component;
        import org.springframework.web.reactive.function.server.HandlerStrategies;
        import org.springframework.web.reactive.function.server.ServerRequest;
        import org.springframework.web.server.ServerWebExchange;
        import reactor.core.publisher.Flux;
        import reactor.core.publisher.Mono;
        import java.util.List;
        /**
         * body 缓存过滤器
         * @author Kerwin
         * @date 2024/1/11
         */
        @Slf4j
        @Component
        public class CacheBodyGlobalFilter implements GlobalFilter, Ordered {
            @Override
            public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                /**
                 * save request path and serviceId into gateway context
                 */
                ServerHttpRequest request = exchange.getRequest();
                if(GatewayBodyUtils.checkBodyHeader(request)){
                    return readBody(exchange, chain);
                }
                return chain.filter(exchange);
            }
            /**
             * default HttpMessageReader
             */
            private static final List> messageReaders = HandlerStrategies.withDefaults().messageReaders();
            /**
             * ReadJsonBody
             *
             * @param exchange
             * @param chain
             * @return
             */
            private Mono readBody(ServerWebExchange exchange, GatewayFilterChain chain) {
                /**
                 * join the body
                 */
                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
                    byte[] bytes = new byte[dataBuffer.readableByteCount()];
                    dataBuffer.read(bytes);
                    DataBufferUtils.release(dataBuffer);
                    Flux cachedFlux = Flux.defer(() -> {
                        DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                        DataBufferUtils.retain(buffer);
                        return Mono.just(buffer);
                    });
                    /**
                     * repackage ServerHttpRequest
                     */
                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                        @Override
                        public Flux getBody() {
                            return cachedFlux;
                        }
                    };
                    /**
                     * mutate exchage with new ServerHttpRequest
                     */
                    ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
                    /**
                     * read body string with default messageReaders
                     */
                    return ServerRequest.create(mutatedExchange, messageReaders).bodyToMono(String.class)
                            .doOnNext(objectValue -> {
                                log.debug("[GatewayContext]Read JsonBody:{}", objectValue);
                            }).then(chain.filter(mutatedExchange));
                });
            }
            @Override
            public int getOrder() {
                return HIGHEST_PRECEDENCE;
            }
        }
        

        3.2、获取和修改body工具类

        public class GatewayBodyUtils {
            /**
             * 获取请求中的body
             *
             * @param req
             * @return
             */
            public static String getBody(ServerHttpRequest req) {
                if (checkBodyHeader(req)) {
                    AtomicReference requestBody = new AtomicReference<>("");
                    RecorderServerHttpRequestDecorator requestDecorator = new RecorderServerHttpRequestDecorator(req);
                    Flux body = requestDecorator.getBody();
                    body.subscribe(buffer -> {
                        CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
                        requestBody.set(charBuffer.toString());
                    });
                    return requestBody.get();
                }
                return null;
            }
            /**
             * 校验body头
             *
             * @param req
             * @return
             */
            public static boolean checkBodyHeader(ServerHttpRequest req) {
                // 处理参数
                HttpHeaders headers = req.getHeaders();
                MediaType contentType = headers.getContentType();
                long contentLength = headers.getContentLength();
                if (contentLength > 0 && !MediaType.MULTIPART_FORM_DATA_VALUE.startsWith(contentType.toString())) {
                    if (MediaType.APPLICATION_JSON.equals(contentType) || MediaType.APPLICATION_JSON_UTF8.equals(contentType) || contentType.toString().contains(MediaType.APPLICATION_FORM_URLENCODED_VALUE)) {
                        return true;
                    }
                }
                return false;
            }
            /**
             * 修改设置body
             *
             * @param body     不能为空
             * @param exchange
             * @param chain
             * @return
             */
            public static Mono setBody(String body, ServerWebExchange exchange, GatewayFilterChain chain) {
                DataBuffer bodyDataBuffer = stringBuffer(body);
                Flux bodyFlux = Flux.just(bodyDataBuffer);
                MediaType contentType = exchange.getRequest().getHeaders().getContentType();
                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                        exchange.getRequest()) {
                    @Override
                    public HttpHeaders getHeaders() {
                        HttpHeaders httpHeaders = new HttpHeaders();
                        int length = body.getBytes().length;
                        httpHeaders.putAll(super.getHeaders());
                        httpHeaders.remove(HttpHeaders.CONTENT_TYPE);
                        httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);
                        httpHeaders.setContentLength(length);
                        httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType.toString());
                        // 设置CONTENT_TYPE
                        return httpHeaders;
                    }
                    @Override
                    public Flux getBody() {
                        return bodyFlux;
                    }
                };
                return chain.filter(exchange.mutate().request(mutatedRequest).build());
            }
            public static class RecorderServerHttpRequestDecorator extends ServerHttpRequestDecorator {
                private final List dataBuffers = new ArrayList<>();
                public RecorderServerHttpRequestDecorator(ServerHttpRequest delegate) {
                    super(delegate);
                    super.getBody().map(dataBuffer -> {
                        dataBuffers.add(dataBuffer);
                        return dataBuffer;
                    }).subscribe();
                }
                @Override
                public Flux getBody() {
                    return copy();
                }
                private Flux copy() {
                    return Flux.fromIterable(dataBuffers)
                            .map(buf -> buf.factory().wrap(buf.asByteBuffer()));
                }
            }
            public static DataBuffer stringBuffer(String value) {
                byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
                NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
                DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
                buffer.write(bytes);
                return buffer;
            }
        }