首页 > grpc 传递上下文_grpc 源码笔记 02:ClientConn

grpc 传递上下文_grpc 源码笔记 02:ClientConn

上篇笔记中梳理了一把 resolver 和 balancer,这里顺着前面的流程走一遍入口的 ClientConn 对象。

ClientConn

// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {ctx    context.Contextcancel context.CancelFunctarget       stringparsedTarget resolver.Targetauthority    stringdopts        dialOptionscsMgr        *connectivityStateManagerbalancerBuildOpts balancer.BuildOptionsblockingpicker    *pickerWrappermu              sync.RWMutexresolverWrapper *ccResolverWrappersc              *ServiceConfigconns           map[*addrConn]struct{}// Keepalive parameter can be updated if a GoAway is received.mkp             keepalive.ClientParameterscurBalancerName stringbalancerWrapper *ccBalancerWrapperretryThrottler  atomic.ValuefirstResolveEvent *grpcsync.EventchannelzID int64 // channelz unique identification numberczData     *channelzData
}

首先是 ctx 和 cancel 两个字段,之前好像有看到什么最佳实战说不要把 context 字段放在 struct 里传递而要放在 func 里传递,但是这里确实属于一个非常合理的场景:管理连接的生命周期,这个 ctx 和 cancel 都是来自建立连接时的 DialContext,标准库的 net.Conn 的结构体中也有同样的两个字段,这样请求上下文中建立的连接,可以在请求结束时安全释放掉。ClientConn 中派生出的 goroutine,也能通过 cancel 函数安全地关闭掉。

target、parsedTarget、authority、dopts 似乎都属于比较原始的参数。

csMgr 用于管理 ClientConn 总体的连接状态,先放一下,后面详细看。

resolverWrapper、conns、curBalancerName、balancerWrapper、firstResolveEvent 跟名字解析、负载均衡相关,上一篇笔记中简单看过一点。retryThrottler 大约是重试的退避策略,还没有了解过。

sc *ServiceConfig 是服务端给出的服务参数信息,大约是 maxRequestMessageBytes、timeout 之类的控制信息,可以具体到接口级别。mkp keepalive.ClientParameters 也是参数信息,与 keepalive 相关。

channelzID 和 czData 与 channelz 的信息相关,channelz 是 grpc 内部的一些埋点监控性质的信息,大体上是一个异步的 AddTraceEvent 然后汇聚数值,看代码的时候应该可以忽略这部分。

ClientConn 与 resolverWrapper / balancerWrapper 的交互

clientConn 与 resolver / balancer 之间的交互在上一篇笔记中简单梳理过,好处是接口比较明确,所以交互比较清晰。clientConn 与 resolverWrapper / balancerWrapper 之间的交互都是具体的方法,手工梳理一下。

resolverWrapper 对 clientConn 的调用有 updateResolverState。

clientConn 对 resolverWrapper 的调用有 resolveNow。

clientConn 对 balancerWrapper 的调用有:

  • resolveError:调用来自 clientConn 的 updateResolverState 方法,该方法是被 resolverWrapper 所调用的。
  • handleSubConnStateChange,调用来自 clientConn 的 handleSubConnStateChange 方法,该方法又是被 addrConn 的 updateConnectivityState 调用的。
  • updateClientConnState,调用来自 clientConn 的 updateResolverState,用于传递名字解析的更新。

balancerWrapper 对 clientConn 的调用有:

  • newAddrConn、removeAddrConn:大体上与 NewSubConn 和 RemoveSubConn 相映射,addrConn 是具体的 SubConn 的实现。
  • blockingPicker.updatePicker、csMgr.updateState:皆在 UpdateBalancerState 时调用,将 balancer.State 中的 picker 与总连接状态设置给 clientConn。
  • resolveNow:来自 ResolveNow,向 clientConn 发起 resolver 的解析。

画一张图:

2828ecd2e023d216dfc7ad1e98ec7de2.png

交互的过程感觉有点像 k8s 那种侦听结构体的字段变动做收敛逻辑的意思,比如 resolver 给出后端地址、ServiceConfig、附加元信息的 State 结构体,ClientConn 跟 balancer 都拿这一个结构体中自己关心的字段做自己的逻辑,整个流程都异步做。

这张图里只有 handleSubConnStateChange 的来源没标注。它是来自 addrConn 的回调,后面再展开梳理。

ClientConn 的初始化

名字解析与负载均衡都是持续动态刷新的过程,那么整个流程是怎样启动的?裁剪一下 DialContext 函数:

// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// .
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target:            target,csMgr:             &connectivityStateManager{},conns:             make(map[*addrConn]struct{}),dopts:             defaultDialOptions(),blockingpicker:    newPickerWrapper(),czData:            new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}cc.retryThrottler.Store((*retryThrottler)(nil))cc.ctx, cc.cancel = context.WithCancel(context.Background())for _, opt := range opts {opt.apply(&cc.dopts)}// 好像是初始化什么钩子chainUnaryClientInterceptors(cc)chainStreamClientInterceptors(cc)defer func() {if err != nil {cc.Close()}}()if channelz.IsOn() {// ... 初始化 channelz}if !cc.dopts.insecure {// ... tlz 相关参数检查}if cc.dopts.defaultServiceConfigRawJSON != nil {// ... 解析参数指定的默认 ServiceConfig 的 JSON}cc.mkp = cc.dopts.copts.KeepaliveParamsif cc.dopts.copts.Dialer == nil {// ... 默认 Dialer 函数}if cc.dopts.copts.UserAgent != "" {cc.dopts.copts.UserAgent += " " + grpcUA} else {cc.dopts.copts.UserAgent = grpcUA}// 配置 Dial 的超时if cc.dopts.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)defer cancel()}// 退出函数时,如果 DialContext 的 ctx 如果中途撤销或者超时了,则返回 ctx.Err()defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}}()// 从 scChan 中侦听接收 serviceConfig 信息scSet := falseif cc.dopts.scChan != nil {// Try to get an initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &scscSet = true}default:}}// 默认取指数退避if cc.dopts.bs == nil {cc.dopts.bs = backoff.DefaultExponential}// 根据名字的 Scheme 选择 resolverBuilder// Determine the resolver to use.cc.parsedTarget = parseTarget(cc.target)grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)if resolverBuilder == nil {// .. 如果没有找到则按默认的 resolverBuilder}creds := cc.dopts.copts.TransportCredentials// ..  初始化 cc.authority// 阻塞等待 scChanif cc.dopts.scChan != nil && !scSet {// Blocking wait for the initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &sc}case <-ctx.Done():return nil, ctx.Err()}}if cc.dopts.scChan != nil {go cc.scWatcher()}// 初始化 balancervar credsClone credentials.TransportCredentialsif creds := cc.dopts.copts.TransportCredentials; creds != nil {credsClone = creds.Clone()}cc.balancerBuildOpts = balancer.BuildOptions{DialCreds:        credsClone,CredsBundle:      cc.dopts.copts.CredsBundle,Dialer:           cc.dopts.copts.Dialer,ChannelzParentID: cc.channelzID,Target:           cc.parsedTarget,}// Build the resolver.rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}return cc, nil
}

cc.dopts.scChan 这里有一些逻辑,再就是在 dopts.block 时,有主动等连接的逻辑。

顺着 cc.dopts.scChan 找过去,发现参数定义的 dialoptions 里面有这一段:

// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
// Deprecated: service config should be received through name resolver or via
// WithDefaultServiceConfig, as specified at
// .  Will be
// removed in a future 1.x release.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {return newFuncDialOption(func(o *dialOptions) {o.scChan = c})
}

说 scChan 这个字段要废弃了,要么换 WithDefaultServiceConfig 传一个默认的 json,要么通过 resolver 的 UpdateState 中 State 结构体里的 ServiceConfig 字段去动态拿。

ServiceConfig 比想象中更神通广大一点,ClientConn 中有个 applyServiceConfigAndBalancer 方法,甚至会根据动态下发的 ServiceConfig 来调用 switchBalancer 动态切换 balancer 策略。

csMgr 与 WaitForStateChange

回去单独看一下 cc.dopts.block 的逻辑:

// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}

大约是一个死循环连接状态直到 Ready 为止,ClientConn 的连接状态来自 cc.csMgr 做管理,而 csMgr 中的连接状态来自 balancer 对 ClientConn 的 UpdateState 的回调。balancer 的连接状态是对多个连接的连接状态的汇聚,大约是只要有一个连接 Ready,便将 balancer 的连接状态视为 Ready。之前看 balancer 做汇聚连接状态还不大清楚这个的用处,现在看应该主要是为 WaitForStateChange 这个方法服务的,而且这个方法是公共方法,是 ClientConn 的对外 API。

工程上如果开启 cc.dopts.block,似乎配合一个 cc.dopts.timeout 比较好,这样能超时退出。

csMgr 主要做的事情是辅助 ClientConn 实现 connectivity.Reporter 接口,尤其是 WaitForStateChange 方法:

// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {mu         sync.Mutexstate      connectivity.StatenotifyChan chan struct{}channelzID int64
}// ...// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = stateif channelz.IsOn() {// ...}if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}
}func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {csm.mu.Lock()defer csm.mu.Unlock()if csm.notifyChan == nil {csm.notifyChan = make(chan struct{})}return csm.notifyChan
}// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {ch := cc.csMgr.getNotifyChan()if cc.csMgr.getState() != sourceState {return true}select {case <-ctx.Done():return falsecase <-ch:return true}
}

notifyChan 这个 channel 仅通过 close 做广播性的通知。每当 state 状态变化会惰性产生新的 notifyChan,当这个 notifyChan 被关闭时就意味着状态有变化了,起到一个类似条件变量的作用。

blockingpicker

除了 balancerWrapper、resolverWrapper,ClientConn 中还有一个 pickerWrapper 类型的 blockingPicker 字段,本体也是同样主要是并发同步为主。

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {mu         sync.Mutexdone       boolblockingCh chan struct{}picker     balancer.V2Picker// The latest connection error.  TODO: remove when V1 picker is deprecated;// balancer should be responsible for providing the error.*connErr
}type connErr struct {mu  sync.Mutexerr error
}

大约是初始化时生成一个 blockingCh,随后每当 updatePickerV2 改动 picker 时,则关闭旧 blockingCh 同时生成一个新的 blockingCh。

pickerWrapper 对外的主要功能入口是 pick 方法,先看它的注释:

// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {// ...

这些阻塞唯有 balancer 生成新的 picker 对象交给 ClientConn 才能解除。实现风格上,与 WaitForStateChange 类似,每当状态变化时关闭旧 chan、生成新 chan,上锁确保状态变化与更替 chan 两步操作的原子性,对方阻塞等待 chan 的关闭。

picker.Pick() 方法本身是线程安全的,不是很清楚每个 SubConn 能否被多个 goroutine 使用,后面再确认一下这点。

先看到这里,下面是 addrConn,也就是 SubConn 的实现。

更多相关:

  • 我的实验是基于PSPNet模型实现二维图像的语义分割,下面的代码直接从得到的h5文件开始往下做。。。 也不知道是自己的检索能力出现了问题还是咋回事,搜遍全网都没有可以直接拿来用的语义分割代码,东拼西凑,算是搞成功了。 实验平台:Windows、VS2015、Tensorflow1.8 api、Python3.6 具体的流程为:...

  • Path Tracing 懒得翻译了,相信搞图形学的人都能看得懂,2333 Path Tracing is a rendering algorithm similar to ray tracing in which rays are cast from a virtual camera and traced through a s...

  • configure_file( [COPYONLY] [ESCAPE_QUOTES] [@ONLY][NEWLINE_STYLE [UNIX|DOS|WIN32|LF|CRLF] ]) 我遇到的是 configure_file(config/config.in ${CMAKE_SOURCE_DIR}/...

  •     直接复制以下代码创建一个名为settings.xml的文件,放到C:UsersAdministrator.m2下即可