草庐IT

注册中心(二):nacos注册源码分析(基于http)

Monkey KevinWu 2024-06-17 原文

注册中心(二):nacos注册源码分析

cosumer启动的时候,从nacos server上读取指定服务名称的实例列表,缓存到本地内存中。

开启一个定时任务,每隔10s去nacos server上拉取服务列表

nacos的push机制:

通过心跳检测发现服务提供者出现心态超时的时候,推送一个push消息到consumer,更新本地的缓存数据。

Nacos注册源码分析

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.8.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-dependencies</artifactId>
        <version>2.2.8.RELEASE</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
</dependencies>

基于以上版本源码去学习。

客户端Client

我们自己的项目在配置了nacos作为注册中心后,至少要配置这么一个属性

spring.cloud.nacos.discovery.server-addr=10.130.16.110:8848
# 从逻辑上看,这个是通过grpc去注册还是通过http去注册。false-http1.x注册  true-gRPC注册,默认是true,也就是通过gRPC去注册,毕竟gRPC的性能上要比http1.x高很多
spring.cloud.nacos.discovery.ephemeral=false

那么这个属性会让应用找到nacos的server地址去注册。如果不配置的话,会一直报错

springboot的@EnableAutoConfiguration这里就不再讲解了。都到nacos的源码了,springboot默认是熟悉的。

我们再去打开NacosServiceRegistryAutoConfiguration这个类。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

}

其中第三个类NacosAutoServiceRegistration实现了一个抽象类AbstractAutoServiceRegistration.

public abstract class AbstractAutoServiceRegistration<R extends Registration>
		implements AutoServiceRegistration, ApplicationContextAware,
		ApplicationListener<WebServerInitializedEvent> {
    
    @Override
	@SuppressWarnings("deprecation")
	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}

    @Deprecated
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
		this.start();
	}
    public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get()) {
			this.context.publishEvent(
					new InstancePreRegisteredEvent(this, getRegistration()));
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}
}

看到这里有实现一个ApplicationListener<WebServerInitializedEvent>的类,这个类是spring的一个监听事件(观察者模式),而这个事件就是webserver初始化的时候去触发的。onApplicationEvent方法调用了bind()方法。而bind()中又调用了start().

start()中有一个register()。而这个register就是NacosServiceRegistry中的register()

register

public class NacosServiceRegistry implements ServiceRegistry<Registration> {

	@Override
	public void register(Registration registration) {

		if (StringUtils.isEmpty(registration.getServiceId())) {
			log.warn("No service to register for nacos client...");
			return;
		}

		NamingService namingService = namingService();
		String serviceId = registration.getServiceId();
		String group = nacosDiscoveryProperties.getGroup();

		Instance instance = getNacosInstanceFromRegistration(registration);

		try {
			namingService.registerInstance(serviceId, group, instance);
			log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
					instance.getIp(), instance.getPort());
		}
		catch (Exception e) {
			if (nacosDiscoveryProperties.isFailFast()) {
				log.error("nacos registry, {} register failed...{},", serviceId,
						registration.toString(), e);
				rethrowRuntimeException(e);
			}
			else {
				log.warn("Failfast is false. {} register failed...{},", serviceId,
						registration.toString(), e);
			}
		}
	}
}

  • getNacosInstanceFromRegistration 获取注册的实例信息。
private Instance getNacosInstanceFromRegistration(Registration registration) {
    Instance instance = new Instance();
    instance.setIp(registration.getHost());
    instance.setPort(registration.getPort());
    instance.setWeight(nacosDiscoveryProperties.getWeight());
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
    instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
    instance.setMetadata(registration.getMetadata());
    instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
    return instance;
}
  • namingService.registerInstance(serviceId, group, instance);

这个clientProxy有3个实现类,NamingClientProxyDelegate、NamingGrpcClientProxy、NamingHttpClientProxy。

这个类的构造方法中有个init(properties)方法,这个方法中给clientProxy赋值了。走的是NamingClientProxyDelegate方法。一般情况下,带有delegate的方法都是委派模式。

public NacosNamingService(String serverList) throws NacosException {
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
    init(properties);
}

public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}

private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    InitUtils.initWebRootContext(properties);
    initLogName(properties);

    this.changeNotifier = new InstancesChangeNotifier();
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
    this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    clientProxy.registerService(serviceName, groupName, instance);
}

基于http1.x协议注册

  • NamingClientProxyDelegate.registerService

    委派这里做了一个可执行的判断

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
  • NamingClientProxyDelegate.getExecuteClientProxy

    做了一个判断,配置ephemeral=false就走http,否则grpc。这里请注意,如果nacos-server还是用的1.x.x版本的话,会报错的。因为2.x.x增加一个grpc的支持,会额外的多增加一个端口,默认对外提供端口为8848和9848

private NamingClientProxy getExecuteClientProxy(Instance instance) {
    return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
  • NamingHttpClientProxy.registerService

    这里的clientProxy=NamingHttpClientProxy

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                       instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    final Map<String, String> params = new HashMap<String, String>(32);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, groupedServiceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put(IP_PARAM, instance.getIp());
    params.put(PORT_PARAM, String.valueOf(instance.getPort()));
    params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
    params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
    params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
    params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
    params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));

    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}
  • NamingHttpClientProxy.reqApi

    public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
        return reqApi(api, params, Collections.EMPTY_MAP, method);
    }
    
    public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
        throws NacosException {
        return reqApi(api, params, body, serverListManager.getServerList(), method);
    }
    
    public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
                         String method) throws NacosException {
    
        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
    
        if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
            throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }
    
        NacosException exception = new NacosException();
    
        if (serverListManager.isDomain()) {
            String nacosDomain = serverListManager.getNacosDomain();
            for (int i = 0; i < maxRetry; i++) {
                try {
                    return callServer(api, params, body, nacosDomain, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
                        NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                    }
                }
            }
        } else {
            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());
    
            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index);
                try {
                    return callServer(api, params, body, server, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
                        NAMING_LOGGER.debug("request {} failed.", server, e);
                    }
                }
                index = (index + 1) % servers.size();
            }
        }
    
        NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
                            exception.getErrMsg());
    
        throw new NacosException(exception.getErrCode(),
                                 "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
    
    }
    

    serverListManager.isDomain()这个判断是配置了几个nacos server的值,如果是一个的话,走if逻辑,如果多余1个的话,走else逻辑。

    else中的servers就是nacos server服务列表,通过Ramdom拿到一个随机数,然后去callServer(),如果发现其中的一个失败,那么index+1 获取下一个服务节点再去callServer。如果所有的都失败的话,则抛出错误。

  • NamingHttpClientProxy.callServer

    前边的判断支线省略,拼接url,拼好了后,进入try逻辑块中,这里封装了一个nacosRestTemplate类。请求完成后,返回一个restResult,拿到了请求结果后,把请求结果code放入了一个交MetricsMonitor的类中了,从代码上看很明显是监控相关的类,点击进去果然发现是prometheus相关的。这里我们不扩展了,继续回到主线。

    如果返回结果是200的话,把result.content返回去。

public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
            String method) throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    String namespace = params.get(CommonParams.NAMESPACE_ID);
    String group = params.get(CommonParams.GROUP_NAME);
    String serviceName = params.get(CommonParams.SERVICE_NAME);
    params.putAll(getSecurityHeaders(namespace, group, serviceName));
    Header header = NamingHttpUtil.builderHeader();

    String url;
    if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
        url = curServer + api;
    } else {
        if (!InternetAddressUtil.containsPort(curServer)) {
            curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
        }
        url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
    }

    try {
        HttpRestResult<String> restResult = nacosRestTemplate
            .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
        end = System.currentTimeMillis();

        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
            .observe(end - start);

        if (restResult.ok()) {
            return restResult.getData();
        }
        if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
            return StringUtils.EMPTY;
        }
        throw new NacosException(restResult.getCode(), restResult.getMessage());
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
}
  • NacosRestTemplate.exchangeForm

    关键方法:this.requestClient().execute()

public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
                                          String httpMethod, Type responseType) throws Exception {
    RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
        header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
    return execute(url, httpMethod, requestHttpEntity, responseType);
}

private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
                                      Type responseType) throws Exception {
    URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
    if (logger.isDebugEnabled()) {
        logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
    }

    ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
    HttpClientResponse response = null;
    try {
        response = this.requestClient().execute(uri, httpMethod, requestEntity);
        return responseHandler.handle(response);
    } finally {
        if (response != null) {
            response.close();
        }
    }
}
private final HttpClientRequest requestClient;
    
private final List<HttpClientRequestInterceptor> interceptors = new ArrayList<HttpClientRequestInterceptor>();

public NacosRestTemplate(Logger logger, HttpClientRequest requestClient) {
    super(logger);
    this.requestClient = requestClient;
}
private HttpClientRequest requestClient() {
    if (CollectionUtils.isNotEmpty(interceptors)) {
        if (logger.isDebugEnabled()) {
            logger.debug("Execute via interceptors :{}", interceptors);
        }
        return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
    }
    return requestClient;
}

这里的requestClient是构造方法传入进来的。那么我们需要找到初始化的类

public class NamingHttpClientProxy extends AbstractNamingClientProxy {
    
    private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
    
}
public NacosRestTemplate getNacosRestTemplate() {
    return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}

HttpClientBeanHolder.getNacosRestTemplate

典型的双重检查锁。

public static NacosRestTemplate getNacosRestTemplate(HttpClientFactory httpClientFactory) {
    if (httpClientFactory == null) {
        throw new NullPointerException("httpClientFactory is null");
    }
    String factoryName = httpClientFactory.getClass().getName();
    NacosRestTemplate nacosRestTemplate = SINGLETON_REST.get(factoryName);
    if (nacosRestTemplate == null) {
        synchronized (SINGLETON_REST) {
            nacosRestTemplate = SINGLETON_REST.get(factoryName);
            if (nacosRestTemplate != null) {
                return nacosRestTemplate;
            }
            nacosRestTemplate = httpClientFactory.createNacosRestTemplate();
            SINGLETON_REST.put(factoryName, nacosRestTemplate);
        }
    }
    return nacosRestTemplate;
}

而这里的httpClientFactory是什么呢?

而NamingHttpClientFactory是一个AbstractHttpClientFactory的实现类,由于NamingHttpClientProxy没有重写createNacosRestTemplate方法。所以最终引用的也就是AbstractHttpClientFactory的createNacosRestTemplate方法。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mKfKOjzC-1676020921987)(E:\books\img\HttpClientFactory.png)]

private static final HttpClientFactory HTTP_CLIENT_FACTORY = new NamingHttpClientFactory();
public NacosRestTemplate getNacosRestTemplate() {
    return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}

private static class NamingHttpClientFactory extends AbstractHttpClientFactory {
        
    @Override
    protected HttpClientConfig buildHttpClientConfig() {
        return HttpClientConfig.builder().setConTimeOutMillis(CON_TIME_OUT_MILLIS)
            .setReadTimeOutMillis(READ_TIME_OUT_MILLIS).setMaxRedirects(MAX_REDIRECTS).build();
    }

    @Override
    protected Logger assignLogger() {
        return NAMING_LOGGER;
    }
}

AbstractHttpClientFactory.createNacosRestTemplate

@Override
public NacosRestTemplate createNacosRestTemplate() {
    HttpClientConfig httpClientConfig = buildHttpClientConfig();
    final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);

    // enable ssl
    initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
        @Override
        public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
            clientRequest.setSSLContext(loadSSLContext());
            clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
        }
    }, new TlsFileWatcher.FileChangeListener() {
        @Override
        public void onChanged(String filePath) {
            clientRequest.setSSLContext(loadSSLContext());
        }
    });

    return new NacosRestTemplate(assignLogger(), clientRequest);
}

JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);

可以看到这里定义了一个JdkHttpClientRequest 。

再往下跟就到java.net.HttpURLConnection的调用,去请求nacos-server的地址,再往下的就不做分析了,进入了http的通讯层了。

最终返回了一个结果,如果是200的话,就注册成功了。失败了就会抛出异常。

基于gRPC http2.0的注册

这里同样的从gRPC和http的委派来进行分析

  • NamingClientProxyDelegate.registerService

    这里的代码上边已经分析过,我们直接进入gRPC的实现。

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
  • NamingGrpcClientProxy.registerService

    redoService.cacheInstanceForRedo 这个从名称上看应该是重试机制,

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
                       instance);
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    doRegisterService(serviceName, groupName, instance);
}
  • NamingGrpcRedoService.cacheInstanceForRedo

    这里看起来只是给ConcurrentMap中存放一个redoData,并没有其他的逻辑,后续可能会用到这个。回到主线,继续往下走。

private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
    String key = NamingUtils.getGroupedName(serviceName, groupName);
    InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
    synchronized (registeredInstances) {
        registeredInstances.put(key, redoData);
    }
}
  • NamingGrpcClientProxy.doRegisterService

    request是根据构造函数封装的一个实例,requestToServer去进行注册。

public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                                                  NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);
    redoService.instanceRegistered(serviceName, groupName);
}
  • NamingGrpcClientProxy.requestToServer

    request.putAllHeader推测是跟权限校验相关的,我搭建的没有设置鉴权,所以都是空的。

    然后根据rpcClient去调用request方法。根据超时时间判断的,这2个分支最终都会进入一个方法,默认是3s的超时时间。

    最终返回一个response结果。

private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
    throws NacosException {
    try {
        request.putAllHeader(
            getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        Response response =
            requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                            response.getClass().getName(), responseClass.getName());
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
  • RpcClient.request

这里的校验暂且不看,直切主线, response = this.currentConnection.request(request, timeoutMills);

再进入到request方法。

public Response request(Request request, long timeoutMills) throws NacosException {
    int retryTimes = 0;
    Response response;
    Exception exceptionThrow = null;
    long start = System.currentTimeMillis();
    while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
        boolean waitReconnect = false;
        try {
            if (this.currentConnection == null || !isRunning()) {
                waitReconnect = true;
                throw new NacosException(NacosException.CLIENT_DISCONNECT,
                                         "Client not connected, current status:" + rpcClientStatus.get());
            }
            response = this.currentConnection.request(request, timeoutMills);
            if (response == null) {
                throw new NacosException(SERVER_ERROR, "Unknown Exception.");
            }
            if (response instanceof ErrorResponse) {
                if (response.getErrorCode() == NacosException.UN_REGISTER) {
                    synchronized (this) {
                        waitReconnect = true;
                        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                            LoggerUtils.printIfErrorEnabled(LOGGER,
                                                            "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                                            currentConnection.getConnectionId(), request.getClass().getSimpleName());
                            switchServerAsync();
                        }
                    }

                }
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            // return response.
            lastActiveTimeStamp = System.currentTimeMillis();
            return response;

        } catch (Exception e) {
            if (waitReconnect) {
                try {
                    // wait client to reconnect.
                    Thread.sleep(Math.min(100, timeoutMills / 3));
                } catch (Exception exception) {
                    // Do nothing.
                }
            }

            LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
                                            request, retryTimes, e.getMessage());

            exceptionThrow = e;

        }
        retryTimes++;

    }

    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
        switchServerAsyncOnRequestFail();
    }

    if (exceptionThrow != null) {
        throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
            : new NacosException(SERVER_ERROR, exceptionThrow);
    } else {
        throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
    }
}
  • GrpcConnection.request

这里的就是封装的rpc请求,和服务端进行交互的逻辑。在这里封装了一个PayLoad类

@Override
public Response request(Request request, long timeouts) throws NacosException {
    Payload grpcRequest = GrpcUtils.convert(request);
    ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
    Payload grpcResponse;
    try {
        grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }

    return (Response) GrpcUtils.parse(grpcResponse);
}

以上代码是client端源码

Server端

接收注册

客户端和服务端之间进行交互的话,一定需要建立一个网络连接。这里的grpc的源码相对来说比较复杂,就简单分析nacos相关的。

工程名称是nacos-console。

BaseGrpcServer在启动的时候会绑定很多的Handler。

而基于grpc的通信,会进入server端的InstanceRequestHandler

  • InstanceRequestHandler.handle

    从handle方法中可以根据type走到registerInstance中。

    最终进入到EphemeralClientOperationServiceImpl.registerInstance

    public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
        
        private final EphemeralClientOperationServiceImpl clientOperationService;
        
        public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
            this.clientOperationService = clientOperationService;
        }
        
        @Override
        @Secured(action = ActionTypes.WRITE)
        public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
            Service service = Service
                    .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
            switch (request.getType()) {
                case NamingRemoteConstants.REGISTER_INSTANCE:
                    // 注册
                    return registerInstance(service, request, meta);
                case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                    // 取消注册
                    return deregisterInstance(service, request, meta);
                default:
                    throw new NacosException(NacosException.INVALID_PARAM,
                            String.format("Unsupported request type %s", request.getType()));
            }
        }
        
        private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
                throws NacosException {
            // 注册实例
            clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
            NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                    request.getInstance().getIp(), request.getInstance().getPort()));
            return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
        }
        
        private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
            clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
            NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
                    service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort()));
            return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
        }
    }
    
  • EphemeralClientOperationServiceImpl.registerInstance

    这里的clientManager.getClient(client)说明跳转到下边的建立长连接

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        // 获取一个单例的Service,也就是注册的实例
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                                            String.format("Current service %s is persistent service, can't register ephemeral instance.",
                                                          singleton.getGroupedServiceName()));
        }
        // 这里的Client是客户端的长连接,会进入到ClientManagerDelegate的一个委托,最终进入到connectionBasedClientManager中
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        // 对这个实例进行注册
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
            .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
    
  • AbstractClient.addServiceInstance

    // 这个ConcurrentHashMap就是保存实例和发布信息关系的
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            if (instancePublishInfo instanceof BatchInstancePublishInfo) {
                MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
            } else {
                MetricsMonitor.incrementInstanceCount();
            }
        }
        // 这里有一个事件,ClientChangeEvent
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
    
  • ClientServiceIndexesManager

    // 应用Service和clientId的映射,一个应用Service有多个服务,所以会建立多个长连接,用Set来保存clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    // 应用Service和订阅者clientId的关系
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            // 注册
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            // 取消注册
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            // 订阅
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            // 取消订阅
            removeSubscriberIndexes(service, clientId);
        }
    }
    

建立长连接(这里的过程比较难一些,还在持续学习中)

GrpcBiStreamRequestAcceptor这个类是建立连接的。

每一个grpc请求过来后,都会进入到GrpcBiStreamRequestAcceptor.requestBiStream的方法中。

而会话的长连接id就是这里的ConnectionId。

@Service
public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {
     @Autowired
    ConnectionManager connectionManager;
    
    private void traceDetailIfNecessary(Payload grpcRequest) {
        String clientIp = grpcRequest.getMetadata().getClientIp();
        String connectionId = CONTEXT_KEY_CONN_ID.get();
        try {
            if (connectionManager.traced(clientIp)) {
                Loggers.REMOTE_DIGEST.info("[{}]Bi stream request receive, meta={},body={}", connectionId,
                        grpcRequest.getMetadata().toByteString().toStringUtf8(),
                        grpcRequest.getBody().toByteString().toStringUtf8());
            }
        } catch (Throwable throwable) {
            Loggers.REMOTE_DIGEST.error("[{}]Bi stream request error,payload={},error={}", connectionId,
                    grpcRequest.toByteString().toStringUtf8(), throwable);
        }
        
    }
    @Override
    public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
        
        StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
            
            final String connectionId = CONTEXT_KEY_CONN_ID.get();
            
            final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();
            
            final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get();
            
            String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get();
            
            String clientIp = "";
            
            @Override
            public void onNext(Payload payload) {
                
                clientIp = payload.getMetadata().getClientIp();
                traceDetailIfNecessary(payload);
                
                Object parseObj;
                try {
                    parseObj = GrpcUtils.parse(payload);
                } catch (Throwable throwable) {
                    Loggers.REMOTE_DIGEST
                            .warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
                    return;
                }
                
                if (parseObj == null) {
                    Loggers.REMOTE_DIGEST
                            .warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
                                    payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
                    return;
                }
                if (parseObj instanceof ConnectionSetupRequest) {
                    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
                    Map<String, String> labels = setUpRequest.getLabels();
                    String appName = "-";
                    if (labels != null && labels.containsKey(Constants.APPNAME)) {
                        appName = labels.get(Constants.APPNAME);
                    }
                    
                    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
                            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
                            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
                    metaInfo.setTenant(setUpRequest.getTenant());
                    Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
                    connection.setAbilities(setUpRequest.getAbilities());
                    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
                    // 这里会有一个connectionManager.register
                    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
                        //Not register to the connection manager if current server is over limit or server is starting.
                        try {
                            Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
                                    rejectSdkOnStarting ? " server is not started" : " server is over limited.");
                            connection.request(new ConnectResetRequest(), 3000L);
                            connection.close();
                        } catch (Exception e) {
                            //Do nothing.
                            if (connectionManager.traced(clientIp)) {
                                Loggers.REMOTE_DIGEST
                                        .warn("[{}]Send connect reset request error,error={}", connectionId, e);
                            }
                        }
                    }
                    
                } else if (parseObj instanceof Response) {
                    Response response = (Response) parseObj;
                    if (connectionManager.traced(clientIp)) {
                        Loggers.REMOTE_DIGEST
                                .warn("[{}]Receive response of server request  ,response={}", connectionId, response);
                    }
                    RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
                    connectionManager.refreshActiveTime(connectionId);
                } else {
                    Loggers.REMOTE_DIGEST
                            .warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
                                    parseObj);
                }
                
            }
            
            @Override
            public void onError(Throwable t) {
                if (connectionManager.traced(clientIp)) {
                    Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on error,error={}", connectionId, t);
                }
                
                if (responseObserver instanceof ServerCallStreamObserver) {
                    ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
                    if (serverCallStreamObserver.isCancelled()) {
                        //client close the stream.
                    } else {
                        try {
                            serverCallStreamObserver.onCompleted();
                        } catch (Throwable throwable) {
                            //ignore
                        }
                    }
                }
                
            }
            
            @Override
            public void onCompleted() {
                if (connectionManager.traced(clientIp)) {
                    Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on completed", connectionId);
                }
                if (responseObserver instanceof ServerCallStreamObserver) {
                    ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
                    if (serverCallStreamObserver.isCancelled()) {
                        //client close the stream.
                    } else {
                        try {
                            serverCallStreamObserver.onCompleted();
                        } catch (Throwable throwable) {
                            //ignore
                        }
                        
                    }
                }
            }
        };
        
        return streamObserver;
    }
}
  • ConnectionManager.register

    这里的connections是用来管理所有的长连接的

Map<String, Connection> connections = new ConcurrentHashMap<>();

public synchronized boolean register(String connectionId, Connection connection) {

    if (connection.isConnected()) {
        String clientIp = connection.getMetaInfo().clientIp;
        if (connections.containsKey(connectionId)) {
            return true;
        }
        if (checkLimit(connection)) {
            return false;
        }
        if (traced(clientIp)) {
            connection.setTraced(true);
        }
        connections.put(connectionId, connection);
        if (!connectionForClientIp.containsKey(clientIp)) {
            connectionForClientIp.put(clientIp, new AtomicInteger(0));
        }
        connectionForClientIp.get(clientIp).getAndIncrement();

        clientConnectionEventListenerRegistry.notifyClientConnected(connection);

        LOGGER.info("new connection registered successfully, connectionId = {},connection={} ", connectionId,
                    connection);
        return true;

    }
    return false;
}

有关注册中心(二):nacos注册源码分析(基于http)的更多相关文章

  1. ruby - 如何模拟 Net::HTTP::Post? - 2

    是的,我知道最好使用webmock,但我想知道如何在RSpec中模拟此方法:defmethod_to_testurl=URI.parseurireq=Net::HTTP::Post.newurl.pathres=Net::HTTP.start(url.host,url.port)do|http|http.requestreq,foo:1endresend这是RSpec:let(:uri){'http://example.com'}specify'HTTPcall'dohttp=mock:httpNet::HTTP.stub!(:start).and_yieldhttphttp.shou

  2. ruby - Net::HTTP 获取源代码和状态 - 2

    我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur

  3. 叮咚买菜基于 Apache Doris 统一 OLAP 引擎的应用实践 - 2

    导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵

  4. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  5. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  6. Get https://registry-1.docker.io/v2/: net/http: request canceled while waiting - 2

    1.错误信息:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:requestcanceledwhilewaitingforconnection(Client.Timeoutexceededwhileawaitingheaders)或者:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:TLShandshaketimeout2.报错原因:docker使用的镜像网址默认为国外,下载容易超时,需要修改成国内镜像地址(首先阿里

  7. kvm虚拟机安装centos7基于ubuntu20.04系统 - 2

    需求:要创建虚拟机,就需要给他提供一个虚拟的磁盘,我们就在/opt目录下创建一个10G大小的raw格式的虚拟磁盘CentOS-7-x86_64.raw命令格式:qemu-imgcreate-f磁盘格式磁盘名称磁盘大小qemu-imgcreate-f磁盘格式-o?1.创建磁盘qemu-imgcreate-fraw/opt/CentOS-7-x86_64.raw10G执行效果#ls/opt/CentOS-7-x86_64.raw2.安装虚拟机使用virt-install命令,基于我们提供的系统镜像和虚拟磁盘来创建一个虚拟机,另外在创建虚拟机之前,提前打开vnc客户端,在创建虚拟机的时候,通过vnc

  8. 阿里云国际版免费试用:如何注册以及注意事项 - 2

    作为新的阿里云用户,您可以50免费试用多种优惠,价值高达1,700美元(或8,500美元)。这将让您了解和体验阿里云平台上提供的一系列产品和服务。如果您以个人身份注册免费试用,您将获得价值1,700美元的优惠。但是,如果您是注册公司,您可以选择企业免费试用,提交基本信息通过企业实名注册验证,即可开始价值$8,500的免费试用!本教程介绍了如何设置您的帐户并使用您的免费试用版。​关于免费试用在我们开始此试用之前,您还必须遵守以下条款和条件才能访问您的免费试用:只有在一年内创建的账户才有资格获得阿里云免费试用。通过此免费试用优惠,用户可以免费试用免费试用活动页面上列出的每种产品一次。如果您有多个帐

  9. ruby-on-rails - Rails - 从命名路由中提取 HTTP 动词 - 2

    Rails中有没有一种方法可以提取与路由关联的HTTP动词?例如,给定这样的路线:将“users”匹配到:“users#show”,通过:[:get,:post]我能实现这样的目标吗?users_path.respond_to?(:get)(显然#respond_to不是正确的方法)我最接近的是通过执行以下操作,但它似乎并不令人满意。Rails.application.routes.routes.named_routes["users"].constraints[:request_method]#=>/^GET$/对于上下文,我有一个设置cookie然后执行redirect_to:ba

  10. ruby-on-rails - Heroku 吃掉了我的自定义 HTTP header - 2

    我正在使用Heroku(heroku.com)来部署我的Rails应用程序,并且正在构建一个iPhone客户端来与之交互。我的目的是将手机的唯一设备标识符作为HTTPheader传递给应用程序以进行身份​​验证。当我在本地测试时,我的header通过得很好,但在Heroku上它似乎去掉了我的自定义header。我用ruby​​脚本验证:url=URI.parse('http://#{myapp}.heroku.com/')#url=URI.parse('http://localhost:3000/')req=Net::HTTP::Post.new(url.path)#boguspara

随机推荐