自定義資源對象

首先我們先看下如果想基于框架實現一個自定義的資源,應該如何開發,在 網關篇 提到 使用 AA 模式部署的 metrics-server,對外提供了 nodeMetrics 資源對象,用戶可以直接使用 kubectl 去管理,下面的幾個關鍵接口(Interface)你需要仔細看一下,之前也說到過 AA 模式提供的資源描述和內建的資源API定義模式一樣,那我們索性就直接看內置資源的實現。

rest.Storage

K8S的內置資源對象對外提供 REST 風格的交互接口,通過 REST API對外提供服務都需要實現 rest.Storage 接口。

// Single item interfaces:
// (Method: Current -> Proposed)
//    GET: Getter -> NamedGetter
//    WATCH: (n/a) -> NamedWatcher
//    CREATE: (n/a) -> NamedCreater
//    DELETE: Deleter -> NamedDeleter
//    UPDATE: Update -> NamedUpdater

// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
    // New returns an empty object that can be used with Create and Update after request data has been put into it.
    // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
    New() runtime.Object

    // Destroy cleans up its resources on shutdown.
    // Destroy has to be implemented in thread-safe way and be prepared
    // for being called more than once.
    Destroy()
}

在 Github Dir: kubernetes/pkg/registry[3] 中包括所有內置資源的 storage 實現,根據 group 做了區分,你可以點開自己感興趣的 group 查看具體資源的實現。

要想要實現通過 kubectl 去管理自定義資源(get、list、create、patch、update、watch ect),其實也是對應著 REST API中的一些 Verb 行為 還需要按需實現 Github Interface: StandardStorage[4] 接口

// StandardStorage is an interface covering the common verbs. Provided for testing whether a
// resource satisfies the normal storage methods. Use Storage when passing opaque storage objects.
type StandardStorage interface {
    Getter
    Lister
    CreaterUpdater
    GracefulDeleter
    CollectionDeleter
    Watcher
}

每個接口都對應著不同的 REST HTTP Verb,比如:

  1. GET 方法 ->Getter、Lister、Watcher
  2. POST/PUT 方法 -> CreaterUpdater
  3. DELETE 方法 -> GracefulDeleter

你可能想了解,我實現了這些接口之后,k8s.io/apiserver 是如何幫我實現 HTTP Verb 和 接口方法的映射,那么實現細節請看 Github Method: registerResourceHandlers[5]

大致思路是通過斷言接口類型,來為不同的 HTTP Verb 注冊對應的 Handler

// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)

不過一般內置資源通過內嵌 Github Object: genericregiser.Store[6] 對象就可以自動實現所有的接口,對象 genericregiser.Store 主要是實現了資源存儲到 ETCD。

其中Github File: kubernetes/pkg/registry/core/rest/storage_core.go[7] 注冊所有 core 組資源,我們以 core 下的 Github File: Pod[8] 對象為例.

// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &api.Pod{} },
        NewListFunc:              func() runtime.Object { return &api.PodList{} },
        PredicateFunc:            registrypod.MatchPod,
        DefaultQualifiedResource: api.Resource("pods"),
        CreateStrategy:      registrypod.Strategy,
        UpdateStrategy:      registrypod.Strategy,
        DeleteStrategy:      registrypod.Strategy,
        ReturnDeletedObject: true,
        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    ...
    return PodStorage{
        Pod:                 &REST{store, proxyTransport},
        Binding:             &BindingREST{store: store},
        LegacyBinding:       &LegacyBindingREST{bindingREST},
        Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
        Status:              &StatusREST{store: &statusStore},
        EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
        Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
        Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
        Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
        Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
        PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
    }, nil
}

在 Pod Storage 描述中還有一些我們常見的資源的存儲實現,以 Github File: Binding[9] 為例,當我們創建 Pod 的Binding子資源的時候,其實是給Pod綁定一個Node,最終效果看起來是給 Pod 的NodeName字段設定一個值。同樣的,你可以參考下 Pod 的 Github File Exec[10] 子資源的實現,實現了 Connecter 接口,當請求 Exec 子資源的時候,請求會被代理到 Pod 所在 Node的 kubelet 啟動的服務上,這種實現方式和我們上一篇問題提到的 Proxy 子資源實現方式一樣,細節還需各位自己看代碼研究。

// Create ensures a pod is bound to a specific host.
func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) {
    binding, ok := obj.(*api.Binding)
    if !ok {
        return nil, errors.NewBadRequest(fmt.Sprintf("not a Binding object: %#v", obj))
    }
    if name != binding.Name {
        return nil, errors.NewBadRequest("name in URL does not match name in Binding object")
    }
    // TODO: move me to a binding strategy
    if errs := validation.ValidatePodBinding(binding); len(errs) != 0 {
        return nil, errs.ToAggregate()
    }
    if createValidation != nil {
        if err := createValidation(ctx, binding.DeepCopyObject()); err != nil {
            return nil, err
        }
    }
    err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun))
    out = &metav1.Status{Status: metav1.StatusSuccess}
    return
}

認證/授權(authn/authz)

在講解認證/授權前,我們先看上面的這張圖,這張圖的右側的 Hanlder Chain 里我們可以看到每個 Handler 都實現了我們熟悉的功能,認證授權限流審計等功能。配合著圖閱讀 Github Func: DefaultBuildHandlerChain[11] 函數的實現,你會微微一笑(原來就是這么個回事)

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    handler := apiHandler

    handler = filterlatency.TrackCompleted(handler)
    handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
    handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization")
    // APF 服務端限流
    if c.FlowControl != nil {
        workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
        requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
            c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
        handler = filterlatency.TrackCompleted(handler)
        handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4)
        handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
    } else {
        handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
    }

    handler = filterlatency.TrackCompleted(handler)
    // 角色扮演
    handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
    handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation")

    handler = filterlatency.TrackCompleted(handler)
    // 審計日志
    handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
    handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")

   // authn/authz
    failedHandler := genericapifilters.Unauthorized(c.Serializer)
    failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
...
}

這個函數實現了 HTTP Middleware 的能力,用戶請求需要經過層層Handler的執行,鑒權、授權的操作就放在這里實現,不僅僅是鑒權授權,審計、準入控制、流量管理都是通過HTTP Middleware嵌套Handler來完成。

如何認證

如果想自定義認證能力,比如接入公司內部的認證系統,ok沒關系完全可以

K8S 自己有多種認證器 BasicAuth、ClientCA、TokenAuth、ServiceAccountAuth等, Github Interface: authenticator.Request[12]中 定義了認證器的接口,

// Request attempts to extract authentication information from a request and
// returns a Response or an error if the request could not be checked.
type Request interface {
    AuthenticateRequest(req *http.Request) (*Response, bool, error)
}

在初始化 RecommendedOptions.Authentication[13] 的時候會把這些認證器注冊進去,只有有一個認證器認證通過就通過,所以你可以在這里把你自定義認證器注冊進去,只要實現 Request 接口即可。

其實我們最常見的是 x509 的認證器(就是對你的kubeconfig中的客戶端證書的認證) Github Type: x509.Authenticator[14]

授權

對于一些自定義資源的 CURD 操作可能需要連接公司內部的授權系統來判斷用戶是否有權限進行操作,你完全可以自定義授權器來接入公司管控

K8S 有多種授權器 AlwaysAllow、ABAC、Webhook、Node、RBAC等,Github Interface: authorizer.Authorizer[15] 和 Github Interface: authorizer.RuleResolver[16] 兩個接口定義了授權器,其中:

Github Interface: authorizer.Authorizer[17] 用于從請求中獲取授權信息,如果這一步沒成功那么決策為失敗

// Authorizer makes an authorization decision based on information gained by making
// zero or more calls to methods of the Attributes interface. It returns nil when an action is
// authorized, otherwise it returns an error.
type Authorizer interface {
Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}

Github Interface: authorizer.RuleResolver[18] 用于解析規則,看是否可以對資源進行操作

// RuleResolver provides a mechanism for resolving the list of rules that apply to a given user within a namespace.
type RuleResolver interface {
// RulesFor get the list of cluster wide rules, the list of rules in the specific namespace, incomplete status and errors.
RulesFor(user user.Info, namespace string) ([]ResourceRuleInfo, []NonResourceRuleInfo, bool, error)
}

在初始化 RecommendedOptions.Authorization[19] 的時候把自定義授權器注冊進來,如果對RBAC機制感興趣你可以查看 Github Type: rbac.RBACAuthorizer[20] 的實現,不需要太多代碼就給你的APIServer 支持RBAC的能力

準入控制(admission)

其實準入控制可以對應為請求體的參數校驗,以及校驗一些不合規的參數,或者給請求體中填充一些字段,?? 甚至你可以接入公司的變更系統,在封網期間禁止對資源修改,審批通過后才允許執行操作

實現 Github Snippet: 變更準入控制和驗證準入控制接口[21] 接口就可以完成自定義準入校驗

type Interface interface {
    // Handles returns true if this admission controller can handle the given operation
    // where operation can be one of CREATE, UPDATE, DELETE, or CONNECT
    Handles(operation Operation) bool
}

type MutationInterface interface {
    Interface

    // Admit makes an admission decision based on the request attributes.
    // Context is used only for timeout/deadline/cancellation and tracing information.
    Admit(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}

// ValidationInterface is an abstract, pluggable interface for Admission Control decisions.
type ValidationInterface interface {
    Interface

    // Validate makes an admission decision based on the request attributes.  It is NOT allowed to mutate
    // Context is used only for timeout/deadline/cancellation and tracing information.
    Validate(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}

kube-apiserver 中所有已啟用的準入控制器由 Github Variable: chainAdmissionHandler[22] 數據結構管理,當客戶端發送請求給 kube-apiserver 時,Handler 會遍歷 Github Variable: chainAdmissionHandler[23] 中啟用的準入控制器并執行變更和驗證操作,我們想要自定義準入控制就需要把自己的準入控制注冊進來。

Github Dir: kubernetes/plugin/pkg/admission[24] 目錄下為 K8S 內置的準入控制器, kube-apiserver 在啟動時候會調用 Github CodeLine: kubeoptions.NewAdmissionOptions()[25] 把內置的準入控制器列表傳到 ServerRunOptions上。

持久化(storage)

數據的持久化也可以自定義,apiserver 默認是使用 Etcd 來作為后端存儲,不過沒關系!,后端存儲也可以自定義,我司就使用 MYSQL 作為后端存儲來做必要的數據備份

Github Type: Store[26] 用于etcd的存儲,一般k8s資源都嵌入了這個結構體,他實現了下面列出的這些接口,這樣k8s資源把 Store 作為內嵌資源,就自然可以實現下面這些接口,也就天然的可以支持一些 REST 的操作。


// Note: the rest.StandardStorage interface aggregates the common REST verbs
var _ rest.StandardStorage = &Store{}
var _ rest.Exporter = &Store{}
var _ rest.TableConvertor = &Store{}
var _ GenericStore = &Store{}

但是我們完全可以自定義一個自己的存儲,這里給大家一個參考 Github Repo: mink[27] 來實現一個對接MySQL的存儲對象

審計(aduit)

審計是一個 APIServer 不可或缺需要用來扯皮的功能

k8s.io/apiserver 框架已經自帶了審計能力 Github Func: WithAudit[28]

你完全可以參考 https://kubernetes.io/zh-cn/docs/tasks/debug/debug-cluster/audit/ 文檔里的 Policy 策略完成審計日志的配置

apiVersion: audit.k8s.io/v1 # 這是必填項。
kind: Policy
# 不要在 RequestReceived 階段為任何請求生成審計事件。
omitStages:
  - "RequestReceived"
rules:
  # 在日志中用 RequestResponse 級別記錄 Pod 變化。
  - level: RequestResponse
    resources:
    # 可以替換為你自定義資源的資源組和資源名稱
    - group: ""
      resources: ["pods"]

只需要在服務的啟動參數里指定下面的參數就可以擁有和K8S一樣的審計能力,而你我的朋友,這是你用 k8s.io/apiserver 構建你服務所應得的。  

- --audit-policy-file=/etc/kubernetes/audit-policy.yaml

- --audit-log-path=/var/log/kubernetes/audit/audit.log

流量控制(flow control)

k8s有非常強的服務端限流能力APF,不過目前還不能直接使用,因為需要依賴一個etcd

APF的原理可以參考 https://alexstocks.github.io/html/k8s-apf.html,因為kube-apiserver在啟動后的PostHook里會有專門處理FlowSchema和PriorityLevelConfiguration動態控制流量權重,如果想直接使用,需要做一些兼容改造。

不過 上一篇文章中提到的 KubeGateway 提供一另一種服務端限流能力,雖然沒有APF那么能的細化限流能力,不過對于需要的小伙伴也足夠了,感興趣的同學可以自行翻閱代碼。

More

  1. 如果大家想有手把手的保姆教程,大家可以查看 https://blog.gmem.cc/kubernetes-style-apiserver 這篇博客,寫的非常不錯詳細
  2. 另外本人也提供了一個簡單的 demo: https://github.com/yangsoon/apiserver 可以直接執行 make local-run 就可以運行,并且會把數據存儲到內存,不依賴etcd,感興趣的同學可以看相關的實現
  3. 注意本文只是一個索引文章,更多的細節還是需要大家自己去挖掘,下篇 多租k8s 會基于這篇文章的背景知識來看下字節和紅帽們是如何基于k8s.io/apiserver 玩轉多租k8s

引用鏈接

[1] 阿里云云產品 SAE-https://www.aliyun.com/product/sae: https://www.aliyun.com/product/sae
[2] Github Repo: k8s.io/apiserver : https://github.com/kubernetes/apiserver
[3] Github Dir: kubernetes/pkg/registry: https://github.com/kubernetes/kubernetes/tree/8b98305858b107369f2c9b9fd8ef1c5b0da078c0/pkg/registry
[4] Github Interface: StandardStorage: https://github.com/kubernetes/kubernetes/blob/4d33d837c8be778044d50755de83f8738e957c13/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go#L276
[5] Github Method: registerResourceHandlers: https://github.com/kubernetes/kubernetes/blob/1c2d446648662529282a3bb1528a6dbb50700fdb/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go#L190
[6] Github Object: genericregiser.Store: https://github.com/kubernetes/kubernetes/blob/1c2d446648662529282a3bb1528a6dbb50700fdb/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go#L81
[7] Github File: kubernetes/pkg/registry/core/rest/storage_core.go: https://github.com/kubernetes/kubernetes/blob/d1c296431e0ff2363131707054c4c75ad59cd2c0/pkg/registry/core/rest/storage_core.go#L104
[8] Github File: Pod: https://github.com/kubernetes/kubernetes/blob/d1c296431e0ff2363131707054c4c75ad59cd2c0/pkg/registry/core/rest/storage_core.go#L173
[9] Github File: Binding: https://github.com/kubernetes/kubernetes/blob/d1c296431e0ff2363131707054c4c75ad59cd2c0/pkg/registry/core/pod/storage/storage.go#L159
[10] Github File Exec: https://github.com/kubernetes/kubernetes/blob/2acdbae664bbc5ff9cd5d1ec07f93a14f444cef5/pkg/registry/core/pod/rest/subresources.go#L168
[11] Github Func: DefaultBuildHandlerChain: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/server/config.go#L978
[12] Github Interface: authenticator.Request: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authentication/authenticator/interfaces.go#L34
[13] RecommendedOptions.Authentication: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/server/options/recommended.go#L67
[14] Github Type: x509.Authenticator: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authentication/request/x509/x509.go#L133
[15] Github Interface: authorizer.Authorizer: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L70
[16] Github Interface: authorizer.RuleResolver: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L81
[17] Github Interface: authorizer.Authorizer: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L70
[18] Github Interface: authorizer.RuleResolver: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L81
[19] RecommendedOptions.Authorization: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/server/options/recommended.go#L68
[20] Github Type: rbac.RBACAuthorizer: https://github.com/kubernetes/kubernetes/blob/4a89df5617b8e1e26abb16150502d04e6c180533/plugin/pkg/auth/authorizer/rbac/rbac.go#L50
[21] Github Snippet: 變更準入控制和驗證準入控制接口: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/admission/interfaces.go#L123-L144
[22] Github Variable: chainAdmissionHandler: https://github.com/kubernetes/apiserver/blob/master/pkg/admission/chain.go#L23
[23] Github Variable: chainAdmissionHandler: https://github.com/kubernetes/apiserver/blob/master/pkg/admission/chain.go#L23
[24] Github Dir: kubernetes/plugin/pkg/admission: https://github.com/kubernetes/kubernetes/tree/release-1.20/plugin/pkg/admission
[25] Github CodeLine: kubeoptions.NewAdmissionOptions(): https://github.com/kubernetes/kubernetes/blob/4a89df5617b8e1e26abb16150502d04e6c180533/cmd/kube-apiserver/app/options/options.go#L105
[26] Github Type: Store: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/registry/generic/registry/store.go#L97
[27] Github Repo: mink: https://github.com/acorn-io/mink
[28] Github Func: WithAudit: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/endpoints/filters/audit.go#L42

文章轉自微信公眾號@CNCF

上一篇:

有哪些新聞媒體提供Open API?

下一篇:

python機器人Agent編程——實現一個本地大模型和爬蟲結合的手機號歸屬地天氣查詢Agent
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費