K8s Api-Server流程简述

最近开发环境的k8s出现过一次读取不出来configmap,并导致api-server OOM重启的现象,供应商说是因为configmap条目太多,导致api-server大量decode etcd的数据为yaml所致。在原生k8s上反复重试过多次,均无法复现该现象,所以又看了一遍api-server大致的代码逻辑,这里简单记录一下。

核心业务逻辑

在看代码之前,基于对k8s架构的理解,我猜测:api-server应该就是对etcd的一层代理。其中需要实现了对各种资源的路由、准入控制、限速、资源的格式转换以及各种操作的接口封装等功能。具体代码逻辑又实现了些什么呢?下面来简单介绍。

其实和猜测的差不多,从入口之后,api-server就调用CreateServerChain来创建了一系列的服务,包括:

  • KubeApiServer
    也就是为k8s定义的抽象资源(比如workload,service,configmap等)提供服务;

  • ApiExtensionsServer
    主要负责CRD相关的服务;

  • AggregatorServer
    这个不太熟,但是顾名思义,应该就是服务api aggregator的。

代码框架

先上一张大图,也是为了之后看图就能够快速定位代码逻辑。

前面说的那三种server,其实就在大图中左上角部分,下面在 #总入口 中要重点讲下router这部分的逻辑。

总入口

接下来,我们找熟悉的KubeApiServer分析。函数CreateKubeAPIServer调用了kubeAPIServerConfig.Complete().New。通过该函数,基本看出了端倪,api-server是通过webserver的形势对外提供api服务的;该函数首先准备了一堆的资源组,然后将这些RESTStorageProvider安装到了Master,这里可以理解为准备router信息(较抽象)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
// 准备所有的资源组
restStorageProviders := []RESTStorageProvider{
auditregistrationrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
appsrest.RESTStorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}

// 第二个重点, 安装资源handler了
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
}

以上只是粗枝大叶的入门,如果要对资源router的挂载搞透彻,我们还需要理解进一步理解这里的资源,以及InstallAPIs的流程。

资源组织

k8s api的定义中,对rest-api操作的对象都按照资源组,版本的形势做了组织。要查看所有支持的资源组和版本信息可以执行以下操作:

1
2
3
4
5
6
7
~ $ kc get apiservices
NAME SERVICE AVAILABLE AGE
v1. Local True 3d11h
v1.apps Local True 3d11h
v1beta1.apps Local True 3d11h
v1beta2.apps Local True 3d11h
...

下面会涉及到两个概念:资源组资源

资源组

这里,我们就使用 appsrest.RESTStorageProvider{} 这个最成熟、大家最熟悉的资源组(apps)来分析。

首先看下图目录结构,位于k8s项目pkg/registry/目录下的这些子目录都是k8s支持的资源组类型。这里的apps就是其中之一,apps下面又包含了常见的deployment等workloads。

每一个资源组目录下都包含了一个rest目录,里面放的不是子资源,而是该资源组的RESTStorageProvider接口实现。该接口的NewRESTStorage方法将资源组下的所有资源的storage都以版本信息作key填充到apiGroupInfo.VersionedResourcesStorageMap字典中。前面提到的m.InstallAPIs就是遍历字典,将所有资源组下的资源逐一安装到master router中。

顶层storage

到此,我们已经清楚,每添加一个资源组,在api-server里面都需要把它加入到apiGroupInfo.VersionedResourcesStorageMap中,这样才会被安装到router里面。顶层的storage就是干这个事情的。

来看rest目录中storage_apps.go里面资源组的版本和子资源是如何组织的代码。下面可以看到,对于apps这个资源组,针对v1beta1这个版本,添加了deployments这个资源,而这个组织好的结构就是一个storage。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 该函数将各个版本的storage填充到字典中
func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool) {
...
if apiResourceConfigSource.VersionEnabled(appsapiv1beta1.SchemeGroupVersion) {
apiGroupInfo.VersionedResourcesStorageMap[appsapiv1beta1.SchemeGroupVersion.Version] = p.v1beta1Storage(apiResourceConfigSource, restOptionsGetter)
}
...
return apiGroupInfo, true
}

// 该函数在准备单个版本所有的子资源
func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) map[string]rest.Storage {
storage := map[string]rest.Storage{}

//这里要再看NewStorage的代码,注意它的参数,这在后面会提到
deploymentStorage := deploymentstore.NewStorage(restOptionsGetter)
storage["deployments"] = deploymentStorage.Deployment
...

return storage
}

接下来,我们将进入到deployment资源相关的子目录分析。

资源

通过目录结构图可以知道,每一个资源下都包含了storage和strategy目录。

storage

其中storage中定义了NewREST,还通过REST对外统一提供操作资源的一些操作接口(如New/Get/Update等);这里将NewStorage和DeploymentStorage都一并附上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type DeploymentStorage struct {
Deployment *REST // 这些类型都是包含了 *genericregistry.Store
Status *StatusREST // 这些类型都是包含了 *genericregistry.Store
Scale *ScaleREST // 这些类型都是包含了 *genericregistry.Store
Rollback *RollbackREST // 这些类型都是包含了 *genericregistry.Store
}

func NewStorage(optsGetter generic.RESTOptionsGetter) DeploymentStorage {
// 这里调用了 NewREST,参数还是前面 NewStorage 的参数一如既往的往下传
deploymentRest, deploymentStatusRest, deploymentRollbackRest := NewREST(optsGetter)

return DeploymentStorage{
Deployment: deploymentRest,
Status: deploymentStatusRest,
Scale: &ScaleREST{store: deploymentRest.Store},
Rollback: deploymentRollbackRest,
}
}

在NewREST返回了三个REST,每一个REST里面都包含了 *genericregistry.Store, 该store类型重点实现下面三个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apps.Deployment{} },
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },
DefaultQualifiedResource: apps.Resource("deployments"),

CreateStrategy: deployment.Strategy, // 重点
UpdateStrategy: deployment.Strategy, // 重点
DeleteStrategy: deployment.Strategy, // 重点
...
}
...
}

strategy

接下来就将注意力转移到deployment.Stategy上来了。deploymentStrategy里面主要实现了对资源做增删改前后的各种校验和准备工作,这里就不再详细的讲述。

etcd/cache

前面讲NewREST的时候,提到了其参数,其实该参数的最终值是这样赋值得到的:

1
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}

获取该值的时候,通过调用该factory的GetRESTOptions方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
...
// 重要
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
// 重要
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}

这里第一个要点是使用了一个api-server的参数,确定是否需要使用watchCache,这里一并把api-server与cache有关的两个参数都列出来。

–watch-cache Default: true
Enable watch caching in the apiserver

–watch-cache-sizes stringSlice
Watch cache size settings for some resources (pods, nodes, etc.), comma separated. The individual setting format: resource[.group]#size, where resource is lowercase plural (no version), group is omitted for resources of apiVersion v1 (the legacy core API) and included for others, and size is a number. It takes effect when watch-cache is enabled. Some resources (replicationcontrollers, endpoints, nodes, pods, services, apiservices.apiregistration.k8s.io) have system defaults set by heuristics, others default to default-watch-cache-size

第二个要点是真正的初始化一个storageWithCacher的decorator,也就是在这个函数里面,初始化了backstorage之上的一层cache。该函数显示调用generic.NewRawStorage来new出一个backupstore,然后再基于此封装一层cache。

1
2
3
4
5
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
s, d, err := factory.Create(*config)
...
return s, d
}

这里的factory其实就是在backstore里面对etcd2和etcd3的一层抽象,默认是使用etcd3。另外,对于etcd3,除了封装kv操作之外,这里还引入了自动定时compact etcd已删除version数据的操作,当前默认是5分钟一次。这里就不再一一粘贴,可以去查一下代码的实现。

1
2
3
4
5
6
7
8
9
10
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

安装路由

讲完资源组和资源的内容,我们再来看安装router的时候都做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) {
apiGroupsInfo := []*genericapiserver.APIGroupInfo{}

for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
...
// 注意这里调用了 NewRESTStorage
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if !enabled {
klog.Warningf("Problem initializing API group %q, skipping.", groupName)
continue
}
...
apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
}

// 这里又进一层函数,到里层去安装router
if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
klog.Fatalf("Error in registering group versions: %v", err)
}
}

该函数里面调用的 NewRESTStorage 有没有似曾相识?对,就是我们在资源组的RESTStorageProvider里面实现的方法。看到这里,才将各资源组的实现与安装router结合起来。绕了一大圈,其实是为了干这事!

真正将api资源安装到router的代码,看m.GenericAPIServer.InstallAPIGroups

1
2
3
4
5
6
7
8
9
10
11
12
13
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
...
for _, apiGroupInfo := range apiGroupInfos {
// 看这里
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
...
// 看这里
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}

到此,api-server的基本代码流程就简单过了一遍。当然,这只是其中一个面,带着不同的问题去看代码,梳理的线索不一样,应该有不同的收获。先就写到这里吧,有时间再看看梳理下其他方面。

0%