博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
(二十二)java版spring cloud+spring boot+redis多租户社交电子商务平台-Ribbon设计原理...
阅读量:7023 次
发布时间:2019-06-28

本文共 11408 字,大约阅读时间需要 38 分钟。

电子商务平台源码请加企鹅求求:一零三八七七四六二六。Spring Cloud集成模式下的Ribbon有以下几个特征:

1.Ribbon 服务配置方式

每一个服务配置都有一个Spring ApplicationContext上下文,用于加载各自服务的实例。

  1. 和Feign的集成模式

在使用Feign作为客户端时,最终请求会转发成 http://<服务名称>/的格式,通过LoadBalancerFeignClient, 提取出服务标识<服务名称>,然后根据服务名称在上下文中查找对应服务的负载均衡器FeignLoadBalancer,负载均衡器负责根据既有的服务实例的统计信息,挑选出最合适的服务实例。

二、Spring Cloud模式下和Feign的集成实现方式

和Feign结合的场景下,Feign的调用会被包装成调用请求LoadBalancerCommand,然后底层通过Rxjava基于事件的编码风格,发送请求;Spring Cloud框架通过 Feigin 请求的URL,提取出服务名称,然后在上下文中找到对应服务的的负载均衡器实现FeignLoadBalancer,然后通过负载均衡器中挑选一个合适的Server实例,然后将调用请求转发到该Server实例上,完成调用,在此过程中,记录对应Server实例的调用统计信息。

/**     * Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.     * If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the     * function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has     * exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful     * result during execution and retries will be emitted.     */    public Observable
submit(final ServerOperation
operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } // 同一Server最大尝试次数 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); //下一Server最大尝试次数 final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer // 使用负载均衡器,挑选出合适的Server,然后执行Server请求,将请求的数据和行为整合到ServerStats中 Observable
o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1
>() { @Override // Called for each server being selected public Observable
call(Server server) { // 获取Server的统计值 context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); // Called for each attempt and retry 服务调用 Observable
o = Observable .just(server) .concatMap(new Func1
>() { @Override public Observable
call(final Server server) { context.incAttemptCount();//重试计数 loadBalancerContext.noteOpenConnection(stats);//链接统计 if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } //执行监控器,记录执行时间 final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); //找到合适的server后,开始执行请求 //底层调用有结果后,做消息处理 return operation.call(server).doOnEach(new Observer
() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // 记录统计信息 } @Override public void onError(Throwable e) { recordStats(tracer, stats, null, e);//记录异常信息 logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity;//返回结果值 if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop();//结束计时 //标记请求结束,更新统计信息 loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); //如果失败,根据重试策略触发重试逻辑 // 使用observable 做重试逻辑,根据predicate 做逻辑判断,这里做 if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); // next请求处理,基于重试器操作 if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1
>() { @Override public Observable
call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }复制代码

从一组ServerList 列表中挑选合适的Server

/**     * Compute the final URI from a partial URI in the request. The following steps are performed:     * 
    *
  • 如果host尚未指定,则从负载均衡器中选定 host/port *
  • 如果host 尚未指定并且尚未找到负载均衡器,则尝试从 虚拟地址中确定host/port *
  • 如果指定了HOST,并且URI的授权部分通过虚拟地址设置,并且存在负载均衡器,则通过负载就均衡器中确定host/port(指定的HOST将会被忽略) *
  • 如果host已指定,但是尚未指定负载均衡器和虚拟地址配置,则使用真实地址作为host *
  • if host is missing but none of the above applies, throws ClientException *
* * @param original Original URI passed from caller */ public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException { String host = null; int port = -1; if (original != null) { host = original.getHost(); } if (original != null) { Pair
schemeAndPort = deriveSchemeAndPortFromPartialUri(original); port = schemeAndPort.second(); } // Various Supported Cases // The loadbalancer to use and the instances it has is based on how it was registered // In each of these cases, the client might come in using Full Url or Partial URL ILoadBalancer lb = getLoadBalancer(); if (host == null) { // 提供部分URI,缺少HOST情况下 // well we have to just get the right instances from lb - or we fall back if (lb != null){ Server svc = lb.chooseServer(loadBalancerKey);// 使用负载均衡器选择Server if (svc == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Load balancer does not have available server for client: " + clientName); } //通过负载均衡器选择的结果中选择host host = svc.getHost(); if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Invalid Server for :" + svc); } logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original}); return svc; } else { // No Full URL - and we dont have a LoadBalancer registered to // obtain a server // if we have a vipAddress that came with the registration, we // can use that else we // bail out // 通过虚拟地址配置解析出host配置返回 if (vipAddresses != null && vipAddresses.contains(",")) { throw new ClientException( ClientException.ErrorType.GENERAL, "Method is invoked for client " + clientName + " with partial URI of (" + original + ") with no load balancer configured." + " Also, there are multiple vipAddresses and hence no vip address can be chosen" + " to complete this partial uri"); } else if (vipAddresses != null) { try { Pair
hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses); host = hostAndPort.first(); port = hostAndPort.second(); } catch (URISyntaxException e) { throw new ClientException( ClientException.ErrorType.GENERAL, "Method is invoked for client " + clientName + " with partial URI of (" + original + ") with no load balancer configured. " + " Also, the configured/registered vipAddress is unparseable (to determine host and port)"); } } else { throw new ClientException( ClientException.ErrorType.GENERAL, this.clientName + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)." + " Also has no vipAddress registered"); } } } else { // Full URL Case URL中指定了全地址,可能是虚拟地址或者是hostAndPort // This could either be a vipAddress or a hostAndPort or a real DNS // if vipAddress or hostAndPort, we just have to consult the loadbalancer // but if it does not return a server, we should just proceed anyways // and assume its a DNS // For restClients registered using a vipAddress AND executing a request // by passing in the full URL (including host and port), we should only // consult lb IFF the URL passed is registered as vipAddress in Discovery boolean shouldInterpretAsVip = false; if (lb != null) { shouldInterpretAsVip = isVipRecognized(original.getAuthority()); } if (shouldInterpretAsVip) { Server svc = lb.chooseServer(loadBalancerKey); if (svc != null){ host = svc.getHost(); if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Invalid Server for :" + svc); } logger.debug("using LB returned Server: {} for request: {}", svc, original); return svc; } else { // just fall back as real DNS logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port); } } else { // consult LB to obtain vipAddress backed instance given full URL //Full URL execute request - where url!=vipAddress logger.debug("Using full URL passed in by caller (not using load balancer): {}", original); } } // end of creating final URL if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to"); } // just verify that at this point we have a full URL return new Server(host, port); }复制代码

转载于:https://juejin.im/post/5cef404a518825554e1f1feb

你可能感兴趣的文章
JS删除数组条目中重复的条目
查看>>
jQuery数组处理详解(转)
查看>>
hdu1412
查看>>
后仿真笔记 - ise 联合 modelsim
查看>>
python @property
查看>>
XCOJ 1168 (搜索+期望+高斯消元法)
查看>>
紫书 例题11-9 UVa 1658 (拆点+最小费用流)
查看>>
【天池大数据赛题解析】资金流入流出预测(附Top4答辩ppt)
查看>>
广告点击率预测 [离线部分]
查看>>
CodeForces 659F Polycarp and Hay
查看>>
Servlet客户请求的处理:HTTP请求报头HttpServletRequest接口应用
查看>>
ORACLE同义词使用
查看>>
无法将类型“XXX”隐式转换为“XXX[]”(Cannot implicitly convert type 'XXX' to 'XXX[]')
查看>>
react-navigation
查看>>
pat 1014 1017 排队类问题
查看>>
设计循环队列
查看>>
ecshop 批量生成订单信息
查看>>
Java常用系统变量收集
查看>>
基于Hadoop2.7.3集群数据仓库Hive1.2.2的部署及使用
查看>>
常见负载均衡的优点和缺点对比(Nginx、HAProxy、LVS)
查看>>