
GraphRAG:基于PolarDB+通義千問api+LangChain的知識圖譜定制實踐
首先我們先看下如果想基于框架實現一個自定義的資源,應該如何開發,在 網關篇 提到 使用 AA 模式部署的 metrics-server,對外提供了 nodeMetrics 資源對象,用戶可以直接使用 kubectl 去管理,下面的幾個關鍵接口(Interface)你需要仔細看一下,之前也說到過 AA 模式提供的資源描述和內建的資源API定義模式一樣,那我們索性就直接看內置資源的實現。
K8S的內置資源對象對外提供 REST 風格的交互接口,通過 REST API對外提供服務都需要實現 rest.Storage 接口。
// Single item interfaces:
// (Method: Current -> Proposed)
// GET: Getter -> NamedGetter
// WATCH: (n/a) -> NamedWatcher
// CREATE: (n/a) -> NamedCreater
// DELETE: Deleter -> NamedDeleter
// UPDATE: Update -> NamedUpdater
// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Destroy cleans up its resources on shutdown.
// Destroy has to be implemented in thread-safe way and be prepared
// for being called more than once.
Destroy()
}
在 Github Dir: kubernetes/pkg/registry[3] 中包括所有內置資源的 storage 實現,根據 group 做了區分,你可以點開自己感興趣的 group 查看具體資源的實現。
要想要實現通過 kubectl 去管理自定義資源(get、list、create、patch、update、watch ect),其實也是對應著 REST API中的一些 Verb 行為 還需要按需實現 Github Interface: StandardStorage[4] 接口
// StandardStorage is an interface covering the common verbs. Provided for testing whether a
// resource satisfies the normal storage methods. Use Storage when passing opaque storage objects.
type StandardStorage interface {
Getter
Lister
CreaterUpdater
GracefulDeleter
CollectionDeleter
Watcher
}
每個接口都對應著不同的 REST HTTP Verb,比如:
你可能想了解,我實現了這些接口之后,k8s.io/apiserver 是如何幫我實現 HTTP Verb 和 接口方法的映射,那么實現細節請看 Github Method: registerResourceHandlers[5]
大致思路是通過斷言接口類型,來為不同的 HTTP Verb 注冊對應的 Handler
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
不過一般內置資源通過內嵌 Github Object: genericregiser.Store[6] 對象就可以自動實現所有的接口,對象 genericregiser.Store 主要是實現了資源存儲到 ETCD。
其中Github File: kubernetes/pkg/registry/core/rest/storage_core.go[7] 注冊所有 core 組資源,我們以 core 下的 Github File: Pod[8] 對象為例.
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
PredicateFunc: registrypod.MatchPod,
DefaultQualifiedResource: api.Resource("pods"),
CreateStrategy: registrypod.Strategy,
UpdateStrategy: registrypod.Strategy,
DeleteStrategy: registrypod.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
...
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
LegacyBinding: &LegacyBindingREST{bindingREST},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}, nil
}
在 Pod Storage 描述中還有一些我們常見的資源的存儲實現,以 Github File: Binding[9] 為例,當我們創建 Pod 的Binding子資源的時候,其實是給Pod綁定一個Node,最終效果看起來是給 Pod 的NodeName字段設定一個值。同樣的,你可以參考下 Pod 的 Github File Exec[10] 子資源的實現,實現了 Connecter 接口,當請求 Exec 子資源的時候,請求會被代理到 Pod 所在 Node的 kubelet 啟動的服務上,這種實現方式和我們上一篇問題提到的 Proxy 子資源實現方式一樣,細節還需各位自己看代碼研究。
// Create ensures a pod is bound to a specific host.
func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) {
binding, ok := obj.(*api.Binding)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("not a Binding object: %#v", obj))
}
if name != binding.Name {
return nil, errors.NewBadRequest("name in URL does not match name in Binding object")
}
// TODO: move me to a binding strategy
if errs := validation.ValidatePodBinding(binding); len(errs) != 0 {
return nil, errs.ToAggregate()
}
if createValidation != nil {
if err := createValidation(ctx, binding.DeepCopyObject()); err != nil {
return nil, err
}
}
err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun))
out = &metav1.Status{Status: metav1.StatusSuccess}
return
}
在講解認證/授權前,我們先看上面的這張圖,這張圖的右側的 Hanlder Chain 里我們可以看到每個 Handler 都實現了我們熟悉的功能,認證授權限流審計等功能。配合著圖閱讀 Github Func: DefaultBuildHandlerChain[11] 函數的實現,你會微微一笑(原來就是這么個回事)
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := apiHandler
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization")
// APF 服務端限流
if c.FlowControl != nil {
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = filterlatency.TrackCompleted(handler)
// 角色扮演
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation")
handler = filterlatency.TrackCompleted(handler)
// 審計日志
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")
// authn/authz
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
...
}
這個函數實現了 HTTP Middleware 的能力,用戶請求需要經過層層Handler的執行,鑒權、授權的操作就放在這里實現,不僅僅是鑒權授權,審計、準入控制、流量管理都是通過HTTP Middleware嵌套Handler來完成。
如果想自定義認證能力,比如接入公司內部的認證系統,ok沒關系完全可以
K8S 自己有多種認證器 BasicAuth、ClientCA、TokenAuth、ServiceAccountAuth等, Github Interface: authenticator.Request[12]中 定義了認證器的接口,
// Request attempts to extract authentication information from a request and
// returns a Response or an error if the request could not be checked.
type Request interface {
AuthenticateRequest(req *http.Request) (*Response, bool, error)
}
在初始化 RecommendedOptions.Authentication[13] 的時候會把這些認證器注冊進去,只有有一個認證器認證通過就通過,所以你可以在這里把你自定義認證器注冊進去,只要實現 Request 接口即可。
其實我們最常見的是 x509 的認證器(就是對你的kubeconfig中的客戶端證書的認證) Github Type: x509.Authenticator[14]。
對于一些自定義資源的 CURD 操作可能需要連接公司內部的授權系統來判斷用戶是否有權限進行操作,你完全可以自定義授權器來接入公司管控
K8S 有多種授權器 AlwaysAllow、ABAC、Webhook、Node、RBAC等,Github Interface: authorizer.Authorizer[15] 和 Github Interface: authorizer.RuleResolver[16] 兩個接口定義了授權器,其中:
Github Interface: authorizer.Authorizer[17] 用于從請求中獲取授權信息,如果這一步沒成功那么決策為失敗
// Authorizer makes an authorization decision based on information gained by making
// zero or more calls to methods of the Attributes interface. It returns nil when an action is
// authorized, otherwise it returns an error.
type Authorizer interface {
Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}
Github Interface: authorizer.RuleResolver[18] 用于解析規則,看是否可以對資源進行操作
// RuleResolver provides a mechanism for resolving the list of rules that apply to a given user within a namespace.
type RuleResolver interface {
// RulesFor get the list of cluster wide rules, the list of rules in the specific namespace, incomplete status and errors.
RulesFor(user user.Info, namespace string) ([]ResourceRuleInfo, []NonResourceRuleInfo, bool, error)
}
在初始化 RecommendedOptions.Authorization[19] 的時候把自定義授權器注冊進來,如果對RBAC機制感興趣你可以查看 Github Type: rbac.RBACAuthorizer[20] 的實現,不需要太多代碼就給你的APIServer 支持RBAC的能力
其實準入控制可以對應為請求體的參數校驗,以及校驗一些不合規的參數,或者給請求體中填充一些字段,?? 甚至你可以接入公司的變更系統,在封網期間禁止對資源修改,審批通過后才允許執行操作
實現 Github Snippet: 變更準入控制和驗證準入控制接口[21] 接口就可以完成自定義準入校驗
type Interface interface {
// Handles returns true if this admission controller can handle the given operation
// where operation can be one of CREATE, UPDATE, DELETE, or CONNECT
Handles(operation Operation) bool
}
type MutationInterface interface {
Interface
// Admit makes an admission decision based on the request attributes.
// Context is used only for timeout/deadline/cancellation and tracing information.
Admit(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}
// ValidationInterface is an abstract, pluggable interface for Admission Control decisions.
type ValidationInterface interface {
Interface
// Validate makes an admission decision based on the request attributes. It is NOT allowed to mutate
// Context is used only for timeout/deadline/cancellation and tracing information.
Validate(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}
kube-apiserver 中所有已啟用的準入控制器由 Github Variable: chainAdmissionHandler[22] 數據結構管理,當客戶端發送請求給 kube-apiserver 時,Handler 會遍歷 Github Variable: chainAdmissionHandler[23] 中啟用的準入控制器并執行變更和驗證操作,我們想要自定義準入控制就需要把自己的準入控制注冊進來。
Github Dir: kubernetes/plugin/pkg/admission[24] 目錄下為 K8S 內置的準入控制器, kube-apiserver 在啟動時候會調用 Github CodeLine: kubeoptions.NewAdmissionOptions()[25] 把內置的準入控制器列表傳到 ServerRunOptions上。
數據的持久化也可以自定義,apiserver 默認是使用 Etcd 來作為后端存儲,不過沒關系!,后端存儲也可以自定義,我司就使用 MYSQL 作為后端存儲來做必要的數據備份
Github Type: Store[26] 用于etcd的存儲,一般k8s資源都嵌入了這個結構體,他實現了下面列出的這些接口,這樣k8s資源把 Store 作為內嵌資源,就自然可以實現下面這些接口,也就天然的可以支持一些 REST 的操作。
// Note: the rest.StandardStorage interface aggregates the common REST verbs
var _ rest.StandardStorage = &Store{}
var _ rest.Exporter = &Store{}
var _ rest.TableConvertor = &Store{}
var _ GenericStore = &Store{}
但是我們完全可以自定義一個自己的存儲,這里給大家一個參考 Github Repo: mink[27] 來實現一個對接MySQL的存儲對象
審計是一個 APIServer 不可或缺需要用來扯皮的功能
k8s.io/apiserver 框架已經自帶了審計能力 Github Func: WithAudit[28]
你完全可以參考 https://kubernetes.io/zh-cn/docs/tasks/debug/debug-cluster/audit/ 文檔里的 Policy 策略完成審計日志的配置
apiVersion: audit.k8s.io/v1 # 這是必填項。
kind: Policy
# 不要在 RequestReceived 階段為任何請求生成審計事件。
omitStages:
- "RequestReceived"
rules:
# 在日志中用 RequestResponse 級別記錄 Pod 變化。
- level: RequestResponse
resources:
# 可以替換為你自定義資源的資源組和資源名稱
- group: ""
resources: ["pods"]
只需要在服務的啟動參數里指定下面的參數就可以擁有和K8S一樣的審計能力,而你我的朋友,這是你用 k8s.io/apiserver 構建你服務所應得的。
- --audit-policy-file=/etc/kubernetes/audit-policy.yaml
- --audit-log-path=/var/log/kubernetes/audit/audit.log
k8s有非常強的服務端限流能力APF,不過目前還不能直接使用,因為需要依賴一個etcd
APF的原理可以參考 https://alexstocks.github.io/html/k8s-apf.html,因為kube-apiserver在啟動后的PostHook里會有專門處理FlowSchema和PriorityLevelConfiguration動態控制流量權重,如果想直接使用,需要做一些兼容改造。
不過 上一篇文章中提到的 KubeGateway 提供一另一種服務端限流能力,雖然沒有APF那么能的細化限流能力,不過對于需要的小伙伴也足夠了,感興趣的同學可以自行翻閱代碼。
[1]
阿里云云產品 SAE-https://www.aliyun.com/product/sae: https://www.aliyun.com/product/sae[2]
Github Repo: k8s.io/apiserver : https://github.com/kubernetes/apiserver[3]
Github Dir: kubernetes/pkg/registry: https://github.com/kubernetes/kubernetes/tree/8b98305858b107369f2c9b9fd8ef1c5b0da078c0/pkg/registry[4]
Github Interface: StandardStorage: https://github.com/kubernetes/kubernetes/blob/4d33d837c8be778044d50755de83f8738e957c13/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go#L276[5]
Github Method: registerResourceHandlers: https://github.com/kubernetes/kubernetes/blob/1c2d446648662529282a3bb1528a6dbb50700fdb/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go#L190[6]
Github Object: genericregiser.Store: https://github.com/kubernetes/kubernetes/blob/1c2d446648662529282a3bb1528a6dbb50700fdb/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go#L81[7]
Github File: kubernetes/pkg/registry/core/rest/storage_core.go: https://github.com/kubernetes/kubernetes/blob/d1c296431e0ff2363131707054c4c75ad59cd2c0/pkg/registry/core/rest/storage_core.go#L104[8]
Github File: Pod: https://github.com/kubernetes/kubernetes/blob/d1c296431e0ff2363131707054c4c75ad59cd2c0/pkg/registry/core/rest/storage_core.go#L173[9]
Github File: Binding: https://github.com/kubernetes/kubernetes/blob/d1c296431e0ff2363131707054c4c75ad59cd2c0/pkg/registry/core/pod/storage/storage.go#L159[10]
Github File Exec: https://github.com/kubernetes/kubernetes/blob/2acdbae664bbc5ff9cd5d1ec07f93a14f444cef5/pkg/registry/core/pod/rest/subresources.go#L168[11]
Github Func: DefaultBuildHandlerChain: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/server/config.go#L978[12]
Github Interface: authenticator.Request: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authentication/authenticator/interfaces.go#L34[13]
RecommendedOptions.Authentication: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/server/options/recommended.go#L67[14]
Github Type: x509.Authenticator: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authentication/request/x509/x509.go#L133[15]
Github Interface: authorizer.Authorizer: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L70[16]
Github Interface: authorizer.RuleResolver: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L81[17]
Github Interface: authorizer.Authorizer: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L70[18]
Github Interface: authorizer.RuleResolver: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/authorization/authorizer/interfaces.go#L81[19]
RecommendedOptions.Authorization: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/server/options/recommended.go#L68[20]
Github Type: rbac.RBACAuthorizer: https://github.com/kubernetes/kubernetes/blob/4a89df5617b8e1e26abb16150502d04e6c180533/plugin/pkg/auth/authorizer/rbac/rbac.go#L50[21]
Github Snippet: 變更準入控制和驗證準入控制接口: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/admission/interfaces.go#L123-L144[22]
Github Variable: chainAdmissionHandler: https://github.com/kubernetes/apiserver/blob/master/pkg/admission/chain.go#L23[23]
Github Variable: chainAdmissionHandler: https://github.com/kubernetes/apiserver/blob/master/pkg/admission/chain.go#L23[24]
Github Dir: kubernetes/plugin/pkg/admission: https://github.com/kubernetes/kubernetes/tree/release-1.20/plugin/pkg/admission[25]
Github CodeLine: kubeoptions.NewAdmissionOptions(): https://github.com/kubernetes/kubernetes/blob/4a89df5617b8e1e26abb16150502d04e6c180533/cmd/kube-apiserver/app/options/options.go#L105[26]
Github Type: Store: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/registry/generic/registry/store.go#L97[27]
Github Repo: mink: https://github.com/acorn-io/mink[28]
Github Func: WithAudit: https://github.com/kubernetes/apiserver/blob/9dc08c72a8d36aad9e4508497417d5c6231610fa/pkg/endpoints/filters/audit.go#L42
文章轉自微信公眾號@CNCF