<kbd id="5sdj3"></kbd>
<th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>

    阿里終面:如何設計一個高性能網關?

    共 28907字,需瀏覽 58分鐘

     ·

    2021-02-06 11:59

    來源:cnblogs.com/2YSP/p/14223892.html

    一、前言

    最近在github上看了soul網關的設計,突然就來了興趣準備自己從零開始寫一個高性能的網關。經過兩周時間的開發(fā),我的網關ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理后臺??。

    二、設計

    2.1技術選型

    網關是所有請求的入口,所以要求有很高的吞吐量,為了實現(xiàn)這點可以使用請求異步化來解決。目前一般有以下兩種方案:

    • Tomcat/Jetty+NIO+Servlet3

    Servlet3已經支持異步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。

    • Netty+NIO

    Netty為高并發(fā)而生,目前唯品會的網關使用這個策略,在唯品會的技術文章中在相同的情況下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己處理HTTP協(xié)議,這一塊比較麻煩。

    后面發(fā)現(xiàn)Soul網關是基于Spring WebFlux(底層Netty)的,不用太關心HTTP協(xié)議的處理,于是決定也用Spring WebFlux。

    網關的第二個特點是具備可擴展性,比如Netflix Zuul有preFilters,postFilters等在不同的階段方便處理不同的業(yè)務,基于責任鏈模式將請求進行鏈式處理即可實現(xiàn)。

    在微服務架構下,服務都會進行多實例部署來保證高可用,請求到達網關時,網關需要根據URL找到所有可用的實例,這時就需要服務注冊和發(fā)現(xiàn)功能,即注冊中心。

    現(xiàn)在流行的注冊中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點小眾),因為之前寫RPC框架時已經用過了Zookeeper,所以這次就選擇了Nacos。

    2.2需求清單

    首先要明確目標,即開發(fā)一個具備哪些特性的網關,總結下后如下:

    • 自定義路由規(guī)則

      可基于version的路由規(guī)則設置,路由對象包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。

    • 跨語言

      HTTP協(xié)議天生跨語言

    • 高性能

      Netty本身就是一款高性能的通信框架,同時server將一些路由規(guī)則等數據緩存到JVM內存避免請求admin服務。

    • 高可用

      支持集群模式防止單節(jié)點故障,無狀態(tài)。

    • 灰度發(fā)布

      灰度發(fā)布(又名金絲雀發(fā)布)是指在黑與白之間,能夠平滑過渡的一種發(fā)布方式。在其上可以進行A/B testing,即讓一部分用戶繼續(xù)用產品特性A,一部分用戶開始用產品特性B,如果用戶對B沒有什么反對意見,那么逐步擴大范圍,把所有用戶都遷移到B上面來。通過特性一可以實現(xiàn)。

    • 接口鑒權

      基于責任鏈模式,用戶開發(fā)自己的鑒權插件即可。

    • 負載均衡

      支持多種負載均衡算法,如隨機,輪詢,加權輪詢等。利用SPI機制可以根據配置進行動態(tài)加載。

    2.3架構設計

    在參考了一些優(yōu)秀的網關Zuul,Spring Cloud Gateway,Soul后,將項目劃分為以下幾個模塊。

    名稱描述
    ship-admin后臺管理界面,配置路由規(guī)則等
    ship-server網關服務端,核心功能模塊
    ship-client-spring-boot-starter網關客戶端,自動注冊服務信息到注冊中心
    ship-common一些公共的代碼,如pojo,常量等。

    它們之間的關系如圖:

    網關設計

    注意: 這張圖與實際實現(xiàn)有點出入,Nacos push到本地緩存的那個環(huán)節(jié)沒有實現(xiàn),目前只有ship-sever定時輪詢pull的過程。ship-admin從Nacos獲取注冊服務信息的過程,也改成了ServiceA啟動時主動發(fā)生HTTP請求通知ship-admin。

    2.4表結構設計

    圖片

    三、編碼

    3.1 ship-client-spring-boot-starter

    首先創(chuàng)建一個spring-boot-starter命名為ship-client-spring-boot-starter,不知道如何自定義starter的可以看我以前寫的《開發(fā)自己的starter》。

    其核心類 AutoRegisterListener 就是在項目啟動時做了兩件事:

    1.將服務信息注冊到Nacos注冊中心

    2.通知ship-admin服務上線了并注冊下線hook。

    代碼如下:

    /**
    ?*?Created?by?2YSP?on?2020/12/21
    ?*/

    public?class?AutoRegisterListener?implements?ApplicationListener<ContextRefreshedEvent>?{

    ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(AutoRegisterListener.class);

    ????private?volatile?AtomicBoolean?registered?=?new?AtomicBoolean(false);

    ????private?final?ClientConfigProperties?properties;

    ????@NacosInjected
    ????private?NamingService?namingService;

    ????@Autowired
    ????private?RequestMappingHandlerMapping?handlerMapping;

    ????private?final?ExecutorService?pool;

    ????/**
    ?????*?url?list?to?ignore
    ?????*/

    ????private?static?List?ignoreUrlList?=?new?LinkedList<>();

    ????static?{
    ????????ignoreUrlList.add("/error");
    ????}

    ????public?AutoRegisterListener(ClientConfigProperties?properties)?{
    ????????if?(!check(properties))?{
    ????????????LOGGER.error("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!");
    ????????????throw?new?ShipException("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!");
    ????????}
    ????????this.properties?=?properties;
    ????????pool?=?new?ThreadPoolExecutor(1,?4,?0,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>());
    ????}

    ????/**
    ?????*?check?the?ClientConfigProperties
    ?????*
    ?????*?@param?properties
    ?????*?@return
    ?????*/

    ????private?boolean?check(ClientConfigProperties?properties)?{
    ????????if?(properties.getPort()?==?null?||?properties.getContextPath()?==?null
    ????????????????||?properties.getVersion()?==?null?||?properties.getAppName()?==?null
    ????????????????||?properties.getAdminUrl()?==?null)?{
    ????????????return?false;
    ????????}
    ????????return?true;
    ????}


    ????@Override
    ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
    ????????if?(!registered.compareAndSet(false,?true))?{
    ????????????return;
    ????????}
    ????????doRegister();
    ????????registerShutDownHook();
    ????}

    ????/**
    ?????*?send?unregister?request?to?admin?when?jvm?shutdown
    ?????*/

    ????private?void?registerShutDownHook()?{
    ????????final?String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.UNREGISTER_PATH;
    ????????final?UnregisterAppDTO?unregisterAppDTO?=?new?UnregisterAppDTO();
    ????????unregisterAppDTO.setAppName(properties.getAppName());
    ????????unregisterAppDTO.setVersion(properties.getVersion());
    ????????unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
    ????????unregisterAppDTO.setPort(properties.getPort());
    ????????Runtime.getRuntime().addShutdownHook(new?Thread(()?->?{
    ????????????OkhttpTool.doPost(url,?unregisterAppDTO);
    ????????????LOGGER.info("[{}:{}]?unregister?from?ship-admin?success!",?unregisterAppDTO.getAppName(),?unregisterAppDTO.getVersion());
    ????????}));
    ????}

    ????/**
    ?????*?register?all?interface?info?to?register?center
    ?????*/

    ????private?void?doRegister()?{
    ????????Instance?instance?=?new?Instance();
    ????????instance.setIp(IpUtil.getLocalIpAddress());
    ????????instance.setPort(properties.getPort());
    ????????instance.setEphemeral(true);
    ????????Map?metadataMap?=?new?HashMap<>();
    ????????metadataMap.put("version",?properties.getVersion());
    ????????metadataMap.put("appName",?properties.getAppName());
    ????????instance.setMetadata(metadataMap);
    ????????try?{
    ????????????namingService.registerInstance(properties.getAppName(),?NacosConstants.APP_GROUP_NAME,?instance);
    ????????}?catch?(NacosException?e)?{
    ????????????LOGGER.error("register?to?nacos?fail",?e);
    ????????????throw?new?ShipException(e.getErrCode(),?e.getErrMsg());
    ????????}
    ????????LOGGER.info("register?interface?info?to?nacos?success!");
    ????????//?send?register?request?to?ship-admin
    ????????String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.REGISTER_PATH;
    ????????RegisterAppDTO?registerAppDTO?=?buildRegisterAppDTO(instance);
    ????????OkhttpTool.doPost(url,?registerAppDTO);
    ????????LOGGER.info("register?to?ship-admin?success!");
    ????}


    ????private?RegisterAppDTO?buildRegisterAppDTO(Instance?instance)?{
    ????????RegisterAppDTO?registerAppDTO?=?new?RegisterAppDTO();
    ????????registerAppDTO.setAppName(properties.getAppName());
    ????????registerAppDTO.setContextPath(properties.getContextPath());
    ????????registerAppDTO.setIp(instance.getIp());
    ????????registerAppDTO.setPort(instance.getPort());
    ????????registerAppDTO.setVersion(properties.getVersion());
    ????????return?registerAppDTO;
    ????}
    }

    3.2 ship-server

    ship-sever項目主要包括了兩個部分內容, 1.請求動態(tài)路由的主流程 2.本地緩存數據和ship-admin及nacos同步,這部分在后面3.3再講。

    ship-server實現(xiàn)動態(tài)路由的原理是利用WebFilter攔截請求,然后將請求教給plugin chain去鏈式處理。

    PluginFilter根據URL解析出appName,然后將啟用的plugin組裝成plugin chain。

    public?class?PluginFilter?implements?WebFilter?{

    ????private?ServerConfigProperties?properties;

    ????public?PluginFilter(ServerConfigProperties?properties)?{
    ????????this.properties?=?properties;
    ????}

    ????@Override
    ????public?Mono?filter(ServerWebExchange?exchange,?WebFilterChain?chain)?{
    ????????String?appName?=?parseAppName(exchange);
    ????????if?(CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName)))?{
    ????????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
    ????????}
    ????????PluginChain?pluginChain?=?new?PluginChain(properties,?appName);
    ????????pluginChain.addPlugin(new?DynamicRoutePlugin(properties));
    ????????pluginChain.addPlugin(new?AuthPlugin(properties));
    ????????return?pluginChain.execute(exchange,?pluginChain);
    ????}

    ????private?String?parseAppName(ServerWebExchange?exchange)?{
    ????????RequestPath?path?=?exchange.getRequest().getPath();
    ????????String?appName?=?path.value().split("/")[1];
    ????????return?appName;
    ????}
    }

    PluginChain繼承了AbstractShipPlugin并持有所有要執(zhí)行的插件。

    /**
    ?*?@Author:?Ship
    ?*?@Description:
    ?*?@Date:?Created?in?2020/12/25
    ?*/

    public?class?PluginChain?extends?AbstractShipPlugin?{
    ????/**
    ?????*?the?pos?point?to?current?plugin
    ?????*/

    ????private?int?pos;
    ????/**
    ?????*?the?plugins?of?chain
    ?????*/

    ????private?List?plugins;

    ????private?final?String?appName;

    ????public?PluginChain(ServerConfigProperties?properties,?String?appName)?{
    ????????super(properties);
    ????????this.appName?=?appName;
    ????}

    ????/**
    ?????*?add?enabled?plugin?to?chain
    ?????*
    ?????*?@param?shipPlugin
    ?????*/

    ????public?void?addPlugin(ShipPlugin?shipPlugin)?{
    ????????if?(plugins?==?null)?{
    ????????????plugins?=?new?ArrayList<>();
    ????????}
    ????????if?(!PluginCache.isEnabled(appName,?shipPlugin.name()))?{
    ????????????return;
    ????????}
    ????????plugins.add(shipPlugin);
    ????????//?order?by?the?plugin's?order
    ????????plugins.sort(Comparator.comparing(ShipPlugin::order));
    ????}

    ????@Override
    ????public?Integer?order()?{
    ????????return?null;
    ????}

    ????@Override
    ????public?String?name()?{
    ????????return?null;
    ????}

    ????@Override
    ????public?Mono?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)?{
    ????????if?(pos?==?plugins.size())?{
    ????????????return?exchange.getResponse().setComplete();
    ????????}
    ????????return?pluginChain.plugins.get(pos++).execute(exchange,?pluginChain);
    ????}

    ????public?String?getAppName()?{
    ????????return?appName;
    ????}

    }

    AbstractShipPlugin實現(xiàn)了ShipPlugin接口,并持有ServerConfigProperties配置對象。

    public?abstract?class?AbstractShipPlugin?implements?ShipPlugin?{

    ????protected?ServerConfigProperties?properties;

    ????public?AbstractShipPlugin(ServerConfigProperties?properties)?{
    ????????this.properties?=?properties;
    ????}
    }

    ShipPlugin接口定義了所有插件必須實現(xiàn)的三個方法order(),name()和execute()。

    public?interface?ShipPlugin?{
    ????/**
    ?????*?lower?values?have?higher?priority
    ?????*
    ?????*?@return
    ?????*/

    ????Integer?order();

    ????/**
    ?????*?return?current?plugin?name
    ?????*
    ?????*?@return
    ?????*/

    ????String?name();

    ????Mono?execute(ServerWebExchange?exchange,PluginChain?pluginChain);

    }

    DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動態(tài)路由的主要業(yè)務邏輯。

    /**
    ?*?@Author:?Ship
    ?*?@Description:
    ?*?@Date:?Created?in?2020/12/25
    ?*/

    public?class?DynamicRoutePlugin?extends?AbstractShipPlugin?{

    ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(DynamicRoutePlugin.class);

    ????private?static?WebClient?webClient;

    ????private?static?final?Gson?gson?=?new?GsonBuilder().create();

    ????static?{
    ????????HttpClient?httpClient?=?HttpClient.create()
    ????????????????.tcpConfiguration(client?->
    ????????????????????????client.doOnConnected(conn?->
    ????????????????????????????????conn.addHandlerLast(new?ReadTimeoutHandler(3))
    ????????????????????????????????????????.addHandlerLast(new?WriteTimeoutHandler(3)))
    ????????????????????????????????.option(ChannelOption.TCP_NODELAY,?true)
    ????????????????);
    ????????webClient?=?WebClient.builder().clientConnector(new?ReactorClientHttpConnector(httpClient))
    ????????????????.build();
    ????}

    ????public?DynamicRoutePlugin(ServerConfigProperties?properties)?{
    ????????super(properties);
    ????}

    ????@Override
    ????public?Integer?order()?{
    ????????return?ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
    ????}

    ????@Override
    ????public?String?name()?{
    ????????return?ShipPluginEnum.DYNAMIC_ROUTE.getName();
    ????}

    ????@Override
    ????public?Mono?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)?{
    ????????String?appName?=?pluginChain.getAppName();
    ????????ServiceInstance?serviceInstance?=?chooseInstance(appName,?exchange.getRequest());
    //????????LOGGER.info("selected?instance?is?[{}]",?gson.toJson(serviceInstance));
    ????????//?request?service
    ????????String?url?=?buildUrl(exchange,?serviceInstance);
    ????????return?forward(exchange,?url);
    ????}

    ????/**
    ?????*?forward?request?to?backend?service
    ?????*
    ?????*?@param?exchange
    ?????*?@param?url
    ?????*?@return
    ?????*/

    ????private?Mono?forward(ServerWebExchange?exchange,?String?url)?{
    ????????ServerHttpRequest?request?=?exchange.getRequest();
    ????????ServerHttpResponse?response?=?exchange.getResponse();
    ????????HttpMethod?method?=?request.getMethod();

    ????????WebClient.RequestBodySpec?requestBodySpec?=?webClient.method(method).uri(url).headers((headers)?->?{
    ????????????headers.addAll(request.getHeaders());
    ????????});

    ????????WebClient.RequestHeadersSpec?reqHeadersSpec;
    ????????if?(requireHttpBody(method))?{
    ????????????reqHeadersSpec?=?requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
    ????????}?else?{
    ????????????reqHeadersSpec?=?requestBodySpec;
    ????????}
    ????????//?nio->callback->nio
    ????????return?reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
    ????????????????.onErrorResume(ex?->?{
    ????????????????????return?Mono.defer(()?->?{
    ????????????????????????String?errorResultJson?=?"";
    ????????????????????????if?(ex?instanceof?TimeoutException)?{
    ????????????????????????????errorResultJson?=?"{\"code\":5001,\"message\":\"network?timeout\"}";
    ????????????????????????}?else?{
    ????????????????????????????errorResultJson?=?"{\"code\":5000,\"message\":\"system?error\"}";
    ????????????????????????}
    ????????????????????????return?ShipResponseUtil.doResponse(exchange,?errorResultJson);
    ????????????????????}).then(Mono.empty());
    ????????????????}).flatMap(backendResponse?->?{
    ????????????????????response.setStatusCode(backendResponse.statusCode());
    ????????????????????response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
    ????????????????????return?response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
    ????????????????});
    ????}

    ????/**
    ?????*?weather?the?http?method?need?http?body
    ?????*
    ?????*?@param?method
    ?????*?@return
    ?????*/

    ????private?boolean?requireHttpBody(HttpMethod?method)?{
    ????????if?(method.equals(HttpMethod.POST)?||?method.equals(HttpMethod.PUT)?||?method.equals(HttpMethod.PATCH))?{
    ????????????return?true;
    ????????}
    ????????return?false;
    ????}

    ????private?String?buildUrl(ServerWebExchange?exchange,?ServiceInstance?serviceInstance)?{
    ????????ServerHttpRequest?request?=?exchange.getRequest();
    ????????String?query?=?request.getURI().getQuery();
    ????????String?path?=?request.getPath().value().replaceFirst("/"?+?serviceInstance.getAppName(),?"");
    ????????String?url?=?"http://"?+?serviceInstance.getIp()?+?":"?+?serviceInstance.getPort()?+?path;
    ????????if?(!StringUtils.isEmpty(query))?{
    ????????????url?=?url?+?"?"?+?query;
    ????????}
    ????????return?url;
    ????}


    ????/**
    ?????*?choose?an?ServiceInstance?according?to?route?rule?config?and?load?balancing?algorithm
    ?????*
    ?????*?@param?appName
    ?????*?@param?request
    ?????*?@return
    ?????*/

    ????private?ServiceInstance?chooseInstance(String?appName,?ServerHttpRequest?request)?{
    ????????List?serviceInstances?=?ServiceCache.getAllInstances(appName);
    ????????if?(CollectionUtils.isEmpty(serviceInstances))?{
    ????????????LOGGER.error("service?instance?of?{}?not?find",?appName);
    ????????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
    ????????}
    ????????String?version?=?matchAppVersion(appName,?request);
    ????????if?(StringUtils.isEmpty(version))?{
    ????????????throw?new?ShipException("match?app?version?error");
    ????????}
    ????????//?filter?serviceInstances?by?version
    ????????List?instances?=?serviceInstances.stream().filter(i?->?i.getVersion().equals(version)).collect(Collectors.toList());
    ????????//Select?an?instance?based?on?the?load?balancing?algorithm
    ????????LoadBalance?loadBalance?=?LoadBalanceFactory.getInstance(properties.getLoadBalance(),?appName,?version);
    ????????ServiceInstance?serviceInstance?=?loadBalance.chooseOne(instances);
    ????????return?serviceInstance;
    ????}


    ????private?String?matchAppVersion(String?appName,?ServerHttpRequest?request)?{
    ????????List?rules?=?RouteRuleCache.getRules(appName);
    ????????rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
    ????????for?(AppRuleDTO?rule?:?rules)?{
    ????????????if?(match(rule,?request))?{
    ????????????????return?rule.getVersion();
    ????????????}
    ????????}
    ????????return?null;
    ????}


    ????private?boolean?match(AppRuleDTO?rule,?ServerHttpRequest?request)?{
    ????????String?matchObject?=?rule.getMatchObject();
    ????????String?matchKey?=?rule.getMatchKey();
    ????????String?matchRule?=?rule.getMatchRule();
    ????????Byte?matchMethod?=?rule.getMatchMethod();
    ????????if?(MatchObjectEnum.DEFAULT.getCode().equals(matchObject))?{
    ????????????return?true;
    ????????}?else?if?(MatchObjectEnum.QUERY.getCode().equals(matchObject))?{
    ????????????String?param?=?request.getQueryParams().getFirst(matchKey);
    ????????????if?(!StringUtils.isEmpty(param))?{
    ????????????????return?StringTools.match(param,?matchMethod,?matchRule);
    ????????????}
    ????????}?else?if?(MatchObjectEnum.HEADER.getCode().equals(matchObject))?{
    ????????????HttpHeaders?headers?=?request.getHeaders();
    ????????????String?headerValue?=?headers.getFirst(matchKey);
    ????????????if?(!StringUtils.isEmpty(headerValue))?{
    ????????????????return?StringTools.match(headerValue,?matchMethod,?matchRule);
    ????????????}
    ????????}
    ????????return?false;
    ????}

    }

    3.3 數據同步

    app數據同步

    后臺服務(如訂單服務)啟動時,只將服務名,版本,ip地址和端口號注冊到了Nacos,并沒有實例的權重和啟用的插件信息怎么辦?

    一般在線的實例權重和插件列表都是在管理界面配置,然后動態(tài)生效的,所以需要ship-admin定時更新實例的權重和插件信息到注冊中心。

    對應代碼ship-admin的NacosSyncListener

    /**
    ?*?@Author:?Ship
    ?*?@Description:
    ?*?@Date:?Created?in?2020/12/30
    ?*/

    @Configuration
    public?class?NacosSyncListener?implements?ApplicationListener<ContextRefreshedEvent>?{

    ????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(NacosSyncListener.class);

    ????private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1,
    ????????????new?ShipThreadFactory("nacos-sync",?true).create());

    ????@NacosInjected
    ????private?NamingService?namingService;

    ????@Value("${nacos.discovery.server-addr}")
    ????private?String?baseUrl;

    ????@Resource
    ????private?AppService?appService;

    ????@Override
    ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
    ????????if?(event.getApplicationContext().getParent()?!=?null)?{
    ????????????return;
    ????????}
    ????????String?url?=?"http://"?+?baseUrl?+?NacosConstants.INSTANCE_UPDATE_PATH;
    ????????scheduledPool.scheduleWithFixedDelay(new?NacosSyncTask(namingService,?url,?appService),?0,?30L,?TimeUnit.SECONDS);
    ????}

    ????class?NacosSyncTask?implements?Runnable?{

    ????????private?NamingService?namingService;

    ????????private?String?url;

    ????????private?AppService?appService;

    ????????private?Gson?gson?=?new?GsonBuilder().create();

    ????????public?NacosSyncTask(NamingService?namingService,?String?url,?AppService?appService)?{
    ????????????this.namingService?=?namingService;
    ????????????this.url?=?url;
    ????????????this.appService?=?appService;
    ????????}

    ????????/**
    ?????????*?Regular?update?weight,enabled?plugins?to?nacos?instance
    ?????????*/

    ????????@Override
    ????????public?void?run()?{
    ????????????try?{
    ????????????????//?get?all?app?names
    ????????????????ListView?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME);
    ????????????????if?(CollectionUtils.isEmpty(services.getData()))?{
    ????????????????????return;
    ????????????????}
    ????????????????List?appNames?=?services.getData();
    ????????????????List?appInfos?=?appService.getAppInfos(appNames);
    ????????????????for?(AppInfoDTO?appInfo?:?appInfos)?{
    ????????????????????if?(CollectionUtils.isEmpty(appInfo.getInstances()))?{
    ????????????????????????continue;
    ????????????????????}
    ????????????????????for?(ServiceInstance?instance?:?appInfo.getInstances())?{
    ????????????????????????Map?queryMap?=?buildQueryMap(appInfo,?instance);
    ????????????????????????String?resp?=?OkhttpTool.doPut(url,?queryMap,?"");
    ????????????????????????LOGGER.debug("response?:{}",?resp);
    ????????????????????}
    ????????????????}

    ????????????}?catch?(Exception?e)?{
    ????????????????LOGGER.error("nacos?sync?task?error",?e);
    ????????????}
    ????????}

    ????????private?Map?buildQueryMap(AppInfoDTO?appInfo,?ServiceInstance?instance)?{
    ????????????Map?map?=?new?HashMap<>();
    ????????????map.put("serviceName",?appInfo.getAppName());
    ????????????map.put("groupName",?NacosConstants.APP_GROUP_NAME);
    ????????????map.put("ip",?instance.getIp());
    ????????????map.put("port",?instance.getPort());
    ????????????map.put("weight",?instance.getWeight().doubleValue());
    ????????????NacosMetadata?metadata?=?new?NacosMetadata();
    ????????????metadata.setAppName(appInfo.getAppName());
    ????????????metadata.setVersion(instance.getVersion());
    ????????????metadata.setPlugins(String.join(",",?appInfo.getEnabledPlugins()));
    ????????????map.put("metadata",?StringTools.urlEncode(gson.toJson(metadata)));
    ????????????map.put("ephemeral",?true);
    ????????????return?map;
    ????????}
    ????}
    }

    ship-server再定時從Nacos拉取app數據更新到本地Map緩存。

    /**
    ?*?@Author:?Ship
    ?*?@Description:?sync?data?to?local?cache
    ?*?@Date:?Created?in?2020/12/25
    ?*/

    @Configuration
    public?class?DataSyncTaskListener?implements?ApplicationListener<ContextRefreshedEvent>?{

    ????private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1,
    ????????????new?ShipThreadFactory("service-sync",?true).create());

    ????@NacosInjected
    ????private?NamingService?namingService;

    ????@Autowired
    ????private?ServerConfigProperties?properties;

    ????@Override
    ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
    ????????if?(event.getApplicationContext().getParent()?!=?null)?{
    ????????????return;
    ????????}
    ????????scheduledPool.scheduleWithFixedDelay(new?DataSyncTask(namingService)
    ????????????????,?0L,?properties.getCacheRefreshInterval(),?TimeUnit.SECONDS);
    ????????WebsocketSyncCacheServer?websocketSyncCacheServer?=?new?WebsocketSyncCacheServer(properties.getWebSocketPort());
    ????????websocketSyncCacheServer.start();
    ????}


    ????class?DataSyncTask?implements?Runnable?{

    ????????private?NamingService?namingService;

    ????????public?DataSyncTask(NamingService?namingService)?{
    ????????????this.namingService?=?namingService;
    ????????}

    ????????@Override
    ????????public?void?run()?{
    ????????????try?{
    ????????????????//?get?all?app?names
    ????????????????ListView?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME);
    ????????????????if?(CollectionUtils.isEmpty(services.getData()))?{
    ????????????????????return;
    ????????????????}
    ????????????????List?appNames?=?services.getData();
    ????????????????//?get?all?instances
    ????????????????for?(String?appName?:?appNames)?{
    ????????????????????List?instanceList?=?namingService.getAllInstances(appName,?NacosConstants.APP_GROUP_NAME);
    ????????????????????if?(CollectionUtils.isEmpty(instanceList))?{
    ????????????????????????continue;
    ????????????????????}
    ????????????????????ServiceCache.add(appName,?buildServiceInstances(instanceList));
    ????????????????????List?pluginNames?=?getEnabledPlugins(instanceList);
    ????????????????????PluginCache.add(appName,?pluginNames);
    ????????????????}
    ????????????????ServiceCache.removeExpired(appNames);
    ????????????????PluginCache.removeExpired(appNames);

    ????????????}?catch?(NacosException?e)?{
    ????????????????e.printStackTrace();
    ????????????}
    ????????}

    ????????private?List?getEnabledPlugins(List?instanceList)?{
    ????????????Instance?instance?=?instanceList.get(0);
    ????????????Map?metadata?=?instance.getMetadata();
    ????????????//?plugins:?DynamicRoute,Auth
    ????????????String?plugins?=?metadata.getOrDefault("plugins",?ShipPluginEnum.DYNAMIC_ROUTE.getName());
    ????????????return?Arrays.stream(plugins.split(",")).collect(Collectors.toList());
    ????????}

    ????????private?List?buildServiceInstances(List?instanceList)?{
    ????????????List?list?=?new?LinkedList<>();
    ????????????instanceList.forEach(instance?->?{
    ????????????????Map?metadata?=?instance.getMetadata();
    ????????????????ServiceInstance?serviceInstance?=?new?ServiceInstance();
    ????????????????serviceInstance.setAppName(metadata.get("appName"));
    ????????????????serviceInstance.setIp(instance.getIp());
    ????????????????serviceInstance.setPort(instance.getPort());
    ????????????????serviceInstance.setVersion(metadata.get("version"));
    ????????????????serviceInstance.setWeight((int)?instance.getWeight());
    ????????????????list.add(serviceInstance);
    ????????????});
    ????????????return?list;
    ????????}
    ????}
    }

    路由規(guī)則數據同步

    同時,如果用戶在管理后臺更新了路由規(guī)則,ship-admin需要推送規(guī)則數據到ship-server,這里參考了soul網關的做法利用websocket在第一次建立連接后進行全量同步,此后路由規(guī)則發(fā)生變更就只作增量同步。

    服務端WebsocketSyncCacheServer:

    /**
    ?*?@Author:?Ship
    ?*?@Description:
    ?*?@Date:?Created?in?2020/12/28
    ?*/

    public?class?WebsocketSyncCacheServer?extends?WebSocketServer?{

    ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheServer.class);

    ????private?Gson?gson?=?new?GsonBuilder().create();

    ????private?MessageHandler?messageHandler;

    ????public?WebsocketSyncCacheServer(Integer?port)?{
    ????????super(new?InetSocketAddress(port));
    ????????this.messageHandler?=?new?MessageHandler();
    ????}


    ????@Override
    ????public?void?onOpen(WebSocket?webSocket,?ClientHandshake?clientHandshake)?{
    ????????LOGGER.info("server?is?open");
    ????}

    ????@Override
    ????public?void?onClose(WebSocket?webSocket,?int?i,?String?s,?boolean?b)?{
    ????????LOGGER.info("websocket?server?close...");
    ????}

    ????@Override
    ????public?void?onMessage(WebSocket?webSocket,?String?message)?{
    ????????LOGGER.info("websocket?server?receive?message:\n[{}]",?message);
    ????????this.messageHandler.handler(message);
    ????}

    ????@Override
    ????public?void?onError(WebSocket?webSocket,?Exception?e)?{

    ????}

    ????@Override
    ????public?void?onStart()?{
    ????????LOGGER.info("websocket?server?start...");
    ????}


    ????class?MessageHandler?{

    ????????public?void?handler(String?message)?{
    ????????????RouteRuleOperationDTO?operationDTO?=?gson.fromJson(message,?RouteRuleOperationDTO.class);
    ????????????if?(CollectionUtils.isEmpty(operationDTO.getRuleList()))?{
    ????????????????return;
    ????????????}
    ????????????Map>?map?=?operationDTO.getRuleList()
    ????????????????????.stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
    ????????????if?(OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
    ????????????????????||?OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType()))?{
    ????????????????RouteRuleCache.add(map);
    ????????????}?else?if?(OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType()))?{
    ????????????????RouteRuleCache.remove(map);
    ????????????}
    ????????}
    ????}
    }

    客戶端WebsocketSyncCacheClient:

    /**
    ?*?@Author:?Ship
    ?*?@Description:
    ?*?@Date:?Created?in?2020/12/28
    ?*/

    @Component
    public?class?WebsocketSyncCacheClient?{

    ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheClient.class);

    ????private?WebSocketClient?client;

    ????private?RuleService?ruleService;

    ????private?Gson?gson?=?new?GsonBuilder().create();

    ????public?WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}")?String?serverWebSocketUrl,
    ????????????????????????????????????RuleService?ruleService)?
    {
    ????????if?(StringUtils.isEmpty(serverWebSocketUrl))?{
    ????????????throw?new?ShipException(ShipExceptionEnum.CONFIG_ERROR);
    ????????}
    ????????this.ruleService?=?ruleService;
    ????????ScheduledThreadPoolExecutor?executor?=?new?ScheduledThreadPoolExecutor(1,
    ????????????????new?ShipThreadFactory("websocket-connect",?true).create());
    ????????try?{
    ????????????client?=?new?WebSocketClient(new?URI(serverWebSocketUrl))?{
    ????????????????@Override
    ????????????????public?void?onOpen(ServerHandshake?serverHandshake)?{
    ????????????????????LOGGER.info("client?is?open");
    ????????????????????List?list?=?ruleService.getEnabledRule();
    ????????????????????String?msg?=?gson.toJson(new?RouteRuleOperationDTO(OperationTypeEnum.INSERT,?list));
    ????????????????????send(msg);
    ????????????????}

    ????????????????@Override
    ????????????????public?void?onMessage(String?s)?{
    ????????????????}

    ????????????????@Override
    ????????????????public?void?onClose(int?i,?String?s,?boolean?b)?{
    ????????????????}

    ????????????????@Override
    ????????????????public?void?onError(Exception?e)?{
    ????????????????????LOGGER.error("websocket?client?error",?e);
    ????????????????}
    ????????????};

    ????????????client.connectBlocking();
    ????????????//使用調度線程池進行斷線重連,30秒進行一次
    ????????????executor.scheduleAtFixedRate(()?->?{
    ????????????????if?(client?!=?null?&&?client.isClosed())?{
    ????????????????????try?{
    ????????????????????????client.reconnectBlocking();
    ????????????????????}?catch?(InterruptedException?e)?{
    ????????????????????????LOGGER.error("reconnect?server?fail",?e);
    ????????????????????}
    ????????????????}
    ????????????},?10,?30,?TimeUnit.SECONDS);

    ????????}?catch?(Exception?e)?{
    ????????????LOGGER.error("websocket?sync?cache?exception",?e);
    ????????????throw?new?ShipException(e.getMessage());
    ????????}
    ????}

    ????public??void?send(T?t)?{
    ????????while?(!client.getReadyState().equals(ReadyState.OPEN))?{
    ????????????LOGGER.debug("connecting?...please?wait");
    ????????}
    ????????client.send(gson.toJson(t));
    ????}
    }

    四、測試

    4.1動態(tài)路由測試

    1. 本地啟動nacos ,sh startup.sh -m standalone

    2. 啟動ship-admin

    3. 本地啟動兩個ship-example實例。

      實例1配置:

      ship:
      ??http:
      ????app-name:?order
      ????version:?gray_1.0
      ????context-path:?/order
      ????port:?8081
      ????admin-url:?127.0.0.1:9001

      server:
      ??port:?8081

      nacos:
      ??discovery:
      ????server-addr:?127.0.0.1:8848

      實例2配置:

      ship:
      ??http:
      ????app-name:?order
      ????version:?prod_1.0
      ????context-path:?/order
      ????port:?8082
      ????admin-url:?127.0.0.1:9001

      server:
      ??port:?8082

      nacos:
      ??discovery:
      ????server-addr:?127.0.0.1:8848
    4. 在數據庫添加路由規(guī)則配置,該規(guī)則表示當http header 中的name=ship時請求路由到gray_1.0版本的節(jié)點。

    圖片
    1. 啟動ship-server,看到以下日志時則可以進行測試了。

      2021-01-02?19:57:09.159??INFO?30413?---?[SocketWorker-29]?cn.sp.sync.WebsocketSyncCacheServer??????:?websocket?server?receive?message:
      [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
    2. 用Postman請求http://localhost:9000/order/user/add,POST方式,header設置name=ship,可以看到只有實例1有日志顯示。

      ==========add?user,version:gray_1.0

    4.2性能壓測

    壓測環(huán)境:

    MacBook Pro 13英寸

    處理器 2.3 GHz 四核Intel Core i7

    內存 16 GB 3733 MHz LPDDR4X

    后端節(jié)點個數一個

    壓測工具:wrk

    壓測結果:20個線程,500個連接數,吞吐量大概每秒9400個請求。

    壓測結果

    五、總結

    千里之行始于足下,開始以為寫一個網關會很難,但當你實際開始行動時就會發(fā)現(xiàn)其實沒那么難,所以邁出第一步很重要。過程中也遇到了很多問題,還在github上給soul和nacos這兩個開源項目提了兩個issue,后來發(fā)現(xiàn)是自己的問題,尷尬??。

    END


    有熱門推薦??

    1.?再見,MySQL!性能被 MariaDB 吊打 ?

    2.?淘寶開源代碼質量檢測工具!

    3.?多線程場景下使用 ArrayList,這幾點一定要注意!

    4.?MyBatis 的執(zhí)行流程,寫得太好了叭!

    最近面試BAT,整理一份面試資料Java面試BATJ通關手冊,覆蓋了Java核心技術、JVM、Java并發(fā)、SSM、微服務、數據庫、數據結構等等。

    獲取方式:點“在看”,關注公眾號并回復?Java?領取,更多內容陸續(xù)奉上。

    文章有幫助的話,在看,轉發(fā)吧。

    謝謝支持喲 (*^__^*)

    瀏覽 46
    點贊
    評論
    收藏
    分享

    手機掃一掃分享

    分享
    舉報
    評論
    圖片
    表情
    推薦
    點贊
    評論
    收藏
    分享

    手機掃一掃分享

    分享
    舉報

    <kbd id="5sdj3"></kbd>
    <th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>
    国语对白视频免费观看 | 亚洲日韩一级精品片在线播放 | 艹艹艹视频 | 日本黄色大片日本美女 | 久久这里只有精品99 |