最近在开发中要改造一个普通SpringBoot接口服务为SpringCloud Gateway网关服务,并且需要在网关做验签,由于我们这个服务需要对外几个第三方平台提供接口,每家请求的传参形式都不同,有将签名信息放请求头、也有将签名信息放query参数、还有直接放body中的,请求头和query参数的获取和修改都比较简单,body参数的获取和修改稍微复杂一点,本文会对SpringCloud Gateway常见几种传参数据的获取和修改做讲解。
@Slf4j @Component public class HeaderGlobalFilter implements GlobalFilter , Ordered { @Override public Monofilter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); // 获取请求头中的sign,因为请求头可以有多个一样的key,可能会获取到多个值所以这里返回的是List List signList = request.getHeaders().get("sign"); log.info("请求头中的signList:{}", signList); // 获取请求头中的sign,取第一个 String signFirst = request.getHeaders().getFirst("sign"); log.info("请求头中的signFirst:{}", signFirst); // 请求转发处理,添加新的请求头 // 因为是使用的原始的请求对象添加了新的请求头,所以下游服务能获取到请求原有请求头和新添加的请求头 ServerHttpRequest mutableReq = request.mutate() .header("new-header", "自定义请求头") .build(); ServerWebExchange mutableExchange = exchange.mutate().request(mutableReq).build(); return chain.filter(mutableExchange); } @Override public int getOrder() { return -2000; } }
@Slf4j @Component public class QueryParamGlobalFilter implements GlobalFilter , Ordered { @Override public Monofilter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); // 获取uri对象 URI uri = request.getURI(); log.info("请求的uri = {}",uri); String path = request.getURI().getPath(); log.info("请求的path = {}",path); String query = request.getURI().getQuery(); log.info("请求的query = {}",query); // 获取请求参数转换成键值对,值是一个数组 MultiValueMap queryParams = request.getQueryParams(); log.info("获取请求参数转换成键值对 queryParams = {}",queryParams); // 将query参数转换成键值对,值是一个字符串,hutool包的HttpUtil Map queryParams1 = HttpUtil.decodeParamMap(query, StandardCharsets.UTF_8); log.info("将query参数转换成键值对 queryParams = {}",queryParams1); // 处理uri,通过?分割,下标第0位是请求地址,第1位置是请求参数 String url = uri.toString(); String[] urlSplit = url.split("\\?"); // 拼接新的url 这里自己处理即可,我这里直接替换请求参数 String newUrl = urlSplit[0] + "?" + "name=kerwinNew&age=17"; URI newUri = null; try { //将编码后的 %23 替换为 ,重新用这个字符串生成 URI newUri = new URI(newUrl.replace("%23", "")); } catch (URISyntaxException e) { throw new RuntimeException(e); } ServerHttpRequest mutableReq = request.mutate() .uri(newUri) .build(); ServerWebExchange mutableExchange = exchange.mutate().request(mutableReq).build(); return chain.filter(mutableExchange); } @Override public int getOrder() { return -1000; } }
在Gateway中通常会有一个过滤器链,而 request body 只能读取一次,也就是说,如果在过滤器A中已经读取一次,在后面的过滤器B是无法读取成功的,会抛出如下的报错
java.lang.IllegalStateException: Only one connection receive subscriber allowed. at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:279) at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe(FluxReceive.java:129) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:745)
要解决这个问题需要将获取body的方法重写,进行缓存读即可,我们可以添加一个自定义CacheBodyGlobalFilter全局过滤器来进行处理,要注意的是这个过滤器优先级一定要是最高的,不然别的过滤器读取一次body数据后这里就读取不到了。
import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.HandlerStrategies; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; /** * body 缓存过滤器 * @author Kerwin * @date 2024/1/11 */ @Slf4j @Component public class CacheBodyGlobalFilter implements GlobalFilter, Ordered { @Override public Monofilter(ServerWebExchange exchange, GatewayFilterChain chain) { /** * save request path and serviceId into gateway context */ ServerHttpRequest request = exchange.getRequest(); if(GatewayBodyUtils.checkBodyHeader(request)){ return readBody(exchange, chain); } return chain.filter(exchange); } /** * default HttpMessageReader */ private static final List > messageReaders = HandlerStrategies.withDefaults().messageReaders(); /** * ReadJsonBody * * @param exchange * @param chain * @return */ private Mono readBody(ServerWebExchange exchange, GatewayFilterChain chain) { /** * join the body */ return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); /** * repackage ServerHttpRequest */ ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux getBody() { return cachedFlux; } }; /** * mutate exchage with new ServerHttpRequest */ ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build(); /** * read body string with default messageReaders */ return ServerRequest.create(mutatedExchange, messageReaders).bodyToMono(String.class) .doOnNext(objectValue -> { log.debug("[GatewayContext]Read JsonBody:{}", objectValue); }).then(chain.filter(mutatedExchange)); }); } @Override public int getOrder() { return HIGHEST_PRECEDENCE; } }
public class GatewayBodyUtils { /** * 获取请求中的body * * @param req * @return */ public static String getBody(ServerHttpRequest req) { if (checkBodyHeader(req)) { AtomicReferencerequestBody = new AtomicReference<>(""); RecorderServerHttpRequestDecorator requestDecorator = new RecorderServerHttpRequestDecorator(req); Flux body = requestDecorator.getBody(); body.subscribe(buffer -> { CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer()); requestBody.set(charBuffer.toString()); }); return requestBody.get(); } return null; } /** * 校验body头 * * @param req * @return */ public static boolean checkBodyHeader(ServerHttpRequest req) { // 处理参数 HttpHeaders headers = req.getHeaders(); MediaType contentType = headers.getContentType(); long contentLength = headers.getContentLength(); if (contentLength > 0 && !MediaType.MULTIPART_FORM_DATA_VALUE.startsWith(contentType.toString())) { if (MediaType.APPLICATION_JSON.equals(contentType) || MediaType.APPLICATION_JSON_UTF8.equals(contentType) || contentType.toString().contains(MediaType.APPLICATION_FORM_URLENCODED_VALUE)) { return true; } } return false; } /** * 修改设置body * * @param body 不能为空 * @param exchange * @param chain * @return */ public static Mono setBody(String body, ServerWebExchange exchange, GatewayFilterChain chain) { DataBuffer bodyDataBuffer = stringBuffer(body); Flux bodyFlux = Flux.just(bodyDataBuffer); MediaType contentType = exchange.getRequest().getHeaders().getContentType(); ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator( exchange.getRequest()) { @Override public HttpHeaders getHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); int length = body.getBytes().length; httpHeaders.putAll(super.getHeaders()); httpHeaders.remove(HttpHeaders.CONTENT_TYPE); httpHeaders.remove(HttpHeaders.CONTENT_LENGTH); httpHeaders.setContentLength(length); httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType.toString()); // 设置CONTENT_TYPE return httpHeaders; } @Override public Flux getBody() { return bodyFlux; } }; return chain.filter(exchange.mutate().request(mutatedRequest).build()); } public static class RecorderServerHttpRequestDecorator extends ServerHttpRequestDecorator { private final List dataBuffers = new ArrayList<>(); public RecorderServerHttpRequestDecorator(ServerHttpRequest delegate) { super(delegate); super.getBody().map(dataBuffer -> { dataBuffers.add(dataBuffer); return dataBuffer; }).subscribe(); } @Override public Flux getBody() { return copy(); } private Flux copy() { return Flux.fromIterable(dataBuffers) .map(buf -> buf.factory().wrap(buf.asByteBuffer())); } } public static DataBuffer stringBuffer(String value) { byte[] bytes = value.getBytes(StandardCharsets.UTF_8); NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length); buffer.write(bytes); return buffer; } }