cosumer启动的时候,从nacos server上读取指定服务名称的实例列表,缓存到本地内存中。
开启一个定时任务,每隔10s去nacos server上拉取服务列表
nacos的push机制:
通过心跳检测发现服务提供者出现心态超时的时候,推送一个push消息到consumer,更新本地的缓存数据。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.8.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
基于以上版本源码去学习。
我们自己的项目在配置了nacos作为注册中心后,至少要配置这么一个属性
spring.cloud.nacos.discovery.server-addr=10.130.16.110:8848
# 从逻辑上看,这个是通过grpc去注册还是通过http去注册。false-http1.x注册 true-gRPC注册,默认是true,也就是通过gRPC去注册,毕竟gRPC的性能上要比http1.x高很多
spring.cloud.nacos.discovery.ephemeral=false
那么这个属性会让应用找到nacos的server地址去注册。如果不配置的话,会一直报错
springboot的@EnableAutoConfiguration这里就不再讲解了。都到nacos的源码了,springboot默认是熟悉的。
我们再去打开NacosServiceRegistryAutoConfiguration这个类。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
其中第三个类NacosAutoServiceRegistration实现了一个抽象类AbstractAutoServiceRegistration.
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware,
ApplicationListener<WebServerInitializedEvent> {
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
看到这里有实现一个ApplicationListener<WebServerInitializedEvent>的类,这个类是spring的一个监听事件(观察者模式),而这个事件就是webserver初始化的时候去触发的。onApplicationEvent方法调用了bind()方法。而bind()中又调用了start().
start()中有一个register()。而这个register就是NacosServiceRegistry中的register()。
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
这个clientProxy有3个实现类,NamingClientProxyDelegate、NamingGrpcClientProxy、NamingHttpClientProxy。
这个类的构造方法中有个init(properties)方法,这个方法中给clientProxy赋值了。走的是NamingClientProxyDelegate方法。一般情况下,带有delegate的方法都是委派模式。
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
init(properties);
}
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.changeNotifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
clientProxy.registerService(serviceName, groupName, instance);
}
NamingClientProxyDelegate.registerService
委派这里做了一个可执行的判断
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
NamingClientProxyDelegate.getExecuteClientProxy
做了一个判断,配置ephemeral=false就走http,否则grpc。这里请注意,如果nacos-server还是用的1.x.x版本的话,会报错的。因为2.x.x增加一个grpc的支持,会额外的多增加一个端口,默认对外提供端口为8848和9848
private NamingClientProxy getExecuteClientProxy(Instance instance) {
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
NamingHttpClientProxy.registerService
这里的clientProxy=NamingHttpClientProxy
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
final Map<String, String> params = new HashMap<String, String>(32);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(IP_PARAM, instance.getIp());
params.put(PORT_PARAM, String.valueOf(instance.getPort()));
params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
NamingHttpClientProxy.reqApi
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
throws NacosException {
return reqApi(api, params, body, serverListManager.getServerList(), method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (serverListManager.isDomain()) {
String nacosDomain = serverListManager.getNacosDomain();
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
serverListManager.isDomain()这个判断是配置了几个nacos server的值,如果是一个的话,走if逻辑,如果多余1个的话,走else逻辑。
else中的servers就是nacos server服务列表,通过Ramdom拿到一个随机数,然后去callServer(),如果发现其中的一个失败,那么index+1 获取下一个服务节点再去callServer。如果所有的都失败的话,则抛出错误。
NamingHttpClientProxy.callServer
前边的判断支线省略,拼接url,拼好了后,进入try逻辑块中,这里封装了一个nacosRestTemplate类。请求完成后,返回一个restResult,拿到了请求结果后,把请求结果code放入了一个交MetricsMonitor的类中了,从代码上看很明显是监控相关的类,点击进去果然发现是prometheus相关的。这里我们不扩展了,继续回到主线。
如果返回结果是200的话,把result.content返回去。
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
String namespace = params.get(CommonParams.NAMESPACE_ID);
String group = params.get(CommonParams.GROUP_NAME);
String serviceName = params.get(CommonParams.SERVICE_NAME);
params.putAll(getSecurityHeaders(namespace, group, serviceName));
Header header = NamingHttpUtil.builderHeader();
String url;
if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
url = curServer + api;
} else {
if (!InternetAddressUtil.containsPort(curServer)) {
curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
NacosRestTemplate.exchangeForm
关键方法:this.requestClient().execute()
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
String httpMethod, Type responseType) throws Exception {
RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
return execute(url, httpMethod, requestHttpEntity, responseType);
}
private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
}
private final HttpClientRequest requestClient;
private final List<HttpClientRequestInterceptor> interceptors = new ArrayList<HttpClientRequestInterceptor>();
public NacosRestTemplate(Logger logger, HttpClientRequest requestClient) {
super(logger);
this.requestClient = requestClient;
}
private HttpClientRequest requestClient() {
if (CollectionUtils.isNotEmpty(interceptors)) {
if (logger.isDebugEnabled()) {
logger.debug("Execute via interceptors :{}", interceptors);
}
return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
}
return requestClient;
}
这里的requestClient是构造方法传入进来的。那么我们需要找到初始化的类
public class NamingHttpClientProxy extends AbstractNamingClientProxy {
private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
}
public NacosRestTemplate getNacosRestTemplate() {
return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}
HttpClientBeanHolder.getNacosRestTemplate
典型的双重检查锁。
public static NacosRestTemplate getNacosRestTemplate(HttpClientFactory httpClientFactory) {
if (httpClientFactory == null) {
throw new NullPointerException("httpClientFactory is null");
}
String factoryName = httpClientFactory.getClass().getName();
NacosRestTemplate nacosRestTemplate = SINGLETON_REST.get(factoryName);
if (nacosRestTemplate == null) {
synchronized (SINGLETON_REST) {
nacosRestTemplate = SINGLETON_REST.get(factoryName);
if (nacosRestTemplate != null) {
return nacosRestTemplate;
}
nacosRestTemplate = httpClientFactory.createNacosRestTemplate();
SINGLETON_REST.put(factoryName, nacosRestTemplate);
}
}
return nacosRestTemplate;
}
而这里的httpClientFactory是什么呢?
而NamingHttpClientFactory是一个AbstractHttpClientFactory的实现类,由于NamingHttpClientProxy没有重写createNacosRestTemplate方法。所以最终引用的也就是AbstractHttpClientFactory的createNacosRestTemplate方法。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mKfKOjzC-1676020921987)(E:\books\img\HttpClientFactory.png)]
private static final HttpClientFactory HTTP_CLIENT_FACTORY = new NamingHttpClientFactory();
public NacosRestTemplate getNacosRestTemplate() {
return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}
private static class NamingHttpClientFactory extends AbstractHttpClientFactory {
@Override
protected HttpClientConfig buildHttpClientConfig() {
return HttpClientConfig.builder().setConTimeOutMillis(CON_TIME_OUT_MILLIS)
.setReadTimeOutMillis(READ_TIME_OUT_MILLIS).setMaxRedirects(MAX_REDIRECTS).build();
}
@Override
protected Logger assignLogger() {
return NAMING_LOGGER;
}
}
AbstractHttpClientFactory.createNacosRestTemplate
@Override
public NacosRestTemplate createNacosRestTemplate() {
HttpClientConfig httpClientConfig = buildHttpClientConfig();
final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
// enable ssl
initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
@Override
public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
clientRequest.setSSLContext(loadSSLContext());
clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
}
}, new TlsFileWatcher.FileChangeListener() {
@Override
public void onChanged(String filePath) {
clientRequest.setSSLContext(loadSSLContext());
}
});
return new NacosRestTemplate(assignLogger(), clientRequest);
}
JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
可以看到这里定义了一个JdkHttpClientRequest 。
再往下跟就到java.net.HttpURLConnection的调用,去请求nacos-server的地址,再往下的就不做分析了,进入了http的通讯层了。
最终返回了一个结果,如果是200的话,就注册成功了。失败了就会抛出异常。
这里同样的从gRPC和http的委派来进行分析
NamingClientProxyDelegate.registerService
这里的代码上边已经分析过,我们直接进入gRPC的实现。
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
NamingGrpcClientProxy.registerService
redoService.cacheInstanceForRedo 这个从名称上看应该是重试机制,
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);
}
NamingGrpcRedoService.cacheInstanceForRedo
这里看起来只是给ConcurrentMap中存放一个redoData,并没有其他的逻辑,后续可能会用到这个。回到主线,继续往下走。
private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
synchronized (registeredInstances) {
registeredInstances.put(key, redoData);
}
}
NamingGrpcClientProxy.doRegisterService
request是根据构造函数封装的一个实例,requestToServer去进行注册。
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
NamingGrpcClientProxy.requestToServer
request.putAllHeader推测是跟权限校验相关的,我搭建的没有设置鉴权,所以都是空的。
然后根据rpcClient去调用request方法。根据超时时间判断的,这2个分支最终都会进入一个方法,默认是3s的超时时间。
最终返回一个response结果。
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
这里的校验暂且不看,直切主线, response = this.currentConnection.request(request, timeoutMills);
再进入到request方法。
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response;
Exception exceptionThrow = null;
long start = System.currentTimeMillis();
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
if (response instanceof ErrorResponse) {
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response;
} catch (Exception e) {
if (waitReconnect) {
try {
// wait client to reconnect.
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
exceptionThrow = e;
}
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsyncOnRequestFail();
}
if (exceptionThrow != null) {
throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
: new NacosException(SERVER_ERROR, exceptionThrow);
} else {
throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
}
}
这里的就是封装的rpc请求,和服务端进行交互的逻辑。在这里封装了一个PayLoad类
@Override
public Response request(Request request, long timeouts) throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request);
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
Payload grpcResponse;
try {
grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
return (Response) GrpcUtils.parse(grpcResponse);
}
以上代码是client端源码
客户端和服务端之间进行交互的话,一定需要建立一个网络连接。这里的grpc的源码相对来说比较复杂,就简单分析nacos相关的。
工程名称是nacos-console。
BaseGrpcServer在启动的时候会绑定很多的Handler。

而基于grpc的通信,会进入server端的InstanceRequestHandler
InstanceRequestHandler.handle
从handle方法中可以根据type走到registerInstance中。
最终进入到EphemeralClientOperationServiceImpl.registerInstance
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
private final EphemeralClientOperationServiceImpl clientOperationService;
public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
this.clientOperationService = clientOperationService;
}
@Override
@Secured(action = ActionTypes.WRITE)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
// 注册
return registerInstance(service, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
// 取消注册
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
throws NacosException {
// 注册实例
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
request.getInstance().getIp(), request.getInstance().getPort()));
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort()));
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
}
}
EphemeralClientOperationServiceImpl.registerInstance
这里的clientManager.getClient(client)说明跳转到下边的建立长连接
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 获取一个单例的Service,也就是注册的实例
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
// 这里的Client是客户端的长连接,会进入到ClientManagerDelegate的一个委托,最终进入到connectionBasedClientManager中
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
// 对这个实例进行注册
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
AbstractClient.addServiceInstance
// 这个ConcurrentHashMap就是保存实例和发布信息关系的
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
} else {
MetricsMonitor.incrementInstanceCount();
}
}
// 这里有一个事件,ClientChangeEvent
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
ClientServiceIndexesManager
// 应用Service和clientId的映射,一个应用Service有多个服务,所以会建立多个长连接,用Set来保存clientId
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
// 应用Service和订阅者clientId的关系
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
@Override
public void onEvent(Event event) {
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
// 注册
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
// 取消注册
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
// 订阅
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
// 取消订阅
removeSubscriberIndexes(service, clientId);
}
}
GrpcBiStreamRequestAcceptor这个类是建立连接的。
每一个grpc请求过来后,都会进入到GrpcBiStreamRequestAcceptor.requestBiStream的方法中。
而会话的长连接id就是这里的ConnectionId。
@Service
public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {
@Autowired
ConnectionManager connectionManager;
private void traceDetailIfNecessary(Payload grpcRequest) {
String clientIp = grpcRequest.getMetadata().getClientIp();
String connectionId = CONTEXT_KEY_CONN_ID.get();
try {
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.info("[{}]Bi stream request receive, meta={},body={}", connectionId,
grpcRequest.getMetadata().toByteString().toStringUtf8(),
grpcRequest.getBody().toByteString().toStringUtf8());
}
} catch (Throwable throwable) {
Loggers.REMOTE_DIGEST.error("[{}]Bi stream request error,payload={},error={}", connectionId,
grpcRequest.toByteString().toStringUtf8(), throwable);
}
}
@Override
public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
final String connectionId = CONTEXT_KEY_CONN_ID.get();
final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();
final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get();
String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get();
String clientIp = "";
@Override
public void onNext(Payload payload) {
clientIp = payload.getMetadata().getClientIp();
traceDetailIfNecessary(payload);
Object parseObj;
try {
parseObj = GrpcUtils.parse(payload);
} catch (Throwable throwable) {
Loggers.REMOTE_DIGEST
.warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
return;
}
if (parseObj == null) {
Loggers.REMOTE_DIGEST
.warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
return;
}
if (parseObj instanceof ConnectionSetupRequest) {
ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
Map<String, String> labels = setUpRequest.getLabels();
String appName = "-";
if (labels != null && labels.containsKey(Constants.APPNAME)) {
appName = labels.get(Constants.APPNAME);
}
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
connection.setAbilities(setUpRequest.getAbilities());
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
// 这里会有一个connectionManager.register
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
//Not register to the connection manager if current server is over limit or server is starting.
try {
Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
rejectSdkOnStarting ? " server is not started" : " server is over limited.");
connection.request(new ConnectResetRequest(), 3000L);
connection.close();
} catch (Exception e) {
//Do nothing.
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST
.warn("[{}]Send connect reset request error,error={}", connectionId, e);
}
}
}
} else if (parseObj instanceof Response) {
Response response = (Response) parseObj;
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST
.warn("[{}]Receive response of server request ,response={}", connectionId, response);
}
RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
connectionManager.refreshActiveTime(connectionId);
} else {
Loggers.REMOTE_DIGEST
.warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
parseObj);
}
}
@Override
public void onError(Throwable t) {
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on error,error={}", connectionId, t);
}
if (responseObserver instanceof ServerCallStreamObserver) {
ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
if (serverCallStreamObserver.isCancelled()) {
//client close the stream.
} else {
try {
serverCallStreamObserver.onCompleted();
} catch (Throwable throwable) {
//ignore
}
}
}
}
@Override
public void onCompleted() {
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on completed", connectionId);
}
if (responseObserver instanceof ServerCallStreamObserver) {
ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
if (serverCallStreamObserver.isCancelled()) {
//client close the stream.
} else {
try {
serverCallStreamObserver.onCompleted();
} catch (Throwable throwable) {
//ignore
}
}
}
}
};
return streamObserver;
}
}
ConnectionManager.register
这里的connections是用来管理所有的长连接的
Map<String, Connection> connections = new ConcurrentHashMap<>();
public synchronized boolean register(String connectionId, Connection connection) {
if (connection.isConnected()) {
String clientIp = connection.getMetaInfo().clientIp;
if (connections.containsKey(connectionId)) {
return true;
}
if (checkLimit(connection)) {
return false;
}
if (traced(clientIp)) {
connection.setTraced(true);
}
connections.put(connectionId, connection);
if (!connectionForClientIp.containsKey(clientIp)) {
connectionForClientIp.put(clientIp, new AtomicInteger(0));
}
connectionForClientIp.get(clientIp).getAndIncrement();
clientConnectionEventListenerRegistry.notifyClientConnected(connection);
LOGGER.info("new connection registered successfully, connectionId = {},connection={} ", connectionId,
connection);
return true;
}
return false;
}
是的,我知道最好使用webmock,但我想知道如何在RSpec中模拟此方法:defmethod_to_testurl=URI.parseurireq=Net::HTTP::Post.newurl.pathres=Net::HTTP.start(url.host,url.port)do|http|http.requestreq,foo:1endresend这是RSpec:let(:uri){'http://example.com'}specify'HTTPcall'dohttp=mock:httpNet::HTTP.stub!(:start).and_yieldhttphttp.shou
我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
1.错误信息:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:requestcanceledwhilewaitingforconnection(Client.Timeoutexceededwhileawaitingheaders)或者:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:TLShandshaketimeout2.报错原因:docker使用的镜像网址默认为国外,下载容易超时,需要修改成国内镜像地址(首先阿里
需求:要创建虚拟机,就需要给他提供一个虚拟的磁盘,我们就在/opt目录下创建一个10G大小的raw格式的虚拟磁盘CentOS-7-x86_64.raw命令格式:qemu-imgcreate-f磁盘格式磁盘名称磁盘大小qemu-imgcreate-f磁盘格式-o?1.创建磁盘qemu-imgcreate-fraw/opt/CentOS-7-x86_64.raw10G执行效果#ls/opt/CentOS-7-x86_64.raw2.安装虚拟机使用virt-install命令,基于我们提供的系统镜像和虚拟磁盘来创建一个虚拟机,另外在创建虚拟机之前,提前打开vnc客户端,在创建虚拟机的时候,通过vnc
作为新的阿里云用户,您可以50免费试用多种优惠,价值高达1,700美元(或8,500美元)。这将让您了解和体验阿里云平台上提供的一系列产品和服务。如果您以个人身份注册免费试用,您将获得价值1,700美元的优惠。但是,如果您是注册公司,您可以选择企业免费试用,提交基本信息通过企业实名注册验证,即可开始价值$8,500的免费试用!本教程介绍了如何设置您的帐户并使用您的免费试用版。关于免费试用在我们开始此试用之前,您还必须遵守以下条款和条件才能访问您的免费试用:只有在一年内创建的账户才有资格获得阿里云免费试用。通过此免费试用优惠,用户可以免费试用免费试用活动页面上列出的每种产品一次。如果您有多个帐
Rails中有没有一种方法可以提取与路由关联的HTTP动词?例如,给定这样的路线:将“users”匹配到:“users#show”,通过:[:get,:post]我能实现这样的目标吗?users_path.respond_to?(:get)(显然#respond_to不是正确的方法)我最接近的是通过执行以下操作,但它似乎并不令人满意。Rails.application.routes.routes.named_routes["users"].constraints[:request_method]#=>/^GET$/对于上下文,我有一个设置cookie然后执行redirect_to:ba
我正在使用Heroku(heroku.com)来部署我的Rails应用程序,并且正在构建一个iPhone客户端来与之交互。我的目的是将手机的唯一设备标识符作为HTTPheader传递给应用程序以进行身份验证。当我在本地测试时,我的header通过得很好,但在Heroku上它似乎去掉了我的自定义header。我用ruby脚本验证:url=URI.parse('http://#{myapp}.heroku.com/')#url=URI.parse('http://localhost:3000/')req=Net::HTTP::Post.new(url.path)#boguspara