【K8S原理理解】小白级理解 k8s Operator 及 Informer 您所在的位置:网站首页 mediate和reconcile的区别 【K8S原理理解】小白级理解 k8s Operator 及 Informer

【K8S原理理解】小白级理解 k8s Operator 及 Informer

2023-08-20 05:00| 来源: 网络整理| 查看: 265

参考 K8s 系列(四) - 浅谈 Informer Controller 与 Operator

Controller 和 Operator 没有太大区别,一般称对 k8s 原生资源的控制叫为 Controller,对于自定义资源(CRD)的控制较为 Operator

首先我们通过一个简单的例子了解以下几个概念(类型,API 对象,资源对象)

首先把 k8s 想象成一个支持任意美食制作的生产线,但目前 k8s 只提供一些做基础美食的「厨师」(即为 Pod),若要做高级美食,则需要我们进行「亲自指导」(Operator、Controller ,这两个概念可以理解为一样,都是指导)

现在我想让 k8s 做一种高级的美食 —— 「戚风蛋糕」(一种新的资源)

为了支持「戚风蛋糕」(一种新的资源)制作,那我们要告诉 k8s 该蛋糕配料及做法的**「戚风蛋糕制作表单」(CRD「用户自定义资源定义」)**,为了满足不同用户的需求,因此设置了「多种自定义选项,如甜度、形状等」(资源的参数),总结如下

Apiversion: 甜品组/测试版本 Kind:戚风蛋糕 Metadata: name:此蛋糕的备注(戚风蛋糕-1,或生日快乐蛋糕等等,注意名字不能重复) Spec: 形状:XXX # 用户可自定义 甜度:XXX # 用户可自定义 做法:XXX # 用户可自定义、可以视为 image 的启动命令 配料:XXX # 用户可自定义,可以视为 image

当用户按照「上述格式」构建「蛋糕制作表单」(yaml文件),提交给 k8s 时,k8s 会将其记录到他们的**「专用记录本上的某一页」( 该页可以称之为 API 对象 或 CR「用户自定义资源」)**

Apiversion: 甜品组/测试版本 Kind:戚风蛋糕 Metadata: name:戚风蛋糕编号1 Spec: 形状:圆形 甜度:50% 做法:第一步XX,第二步XX 等等 配料:动物奶油、糖、炼乳

若用户未按照「上述格式」构建「某种蛋糕制作表单」(yaml文件),k8s 便不会理解,也不会进行记录

当记录到「专用记录本上的某一页」后,k8s 的**「主厨的助手在实时监控着」(Operator、Controller 的 Informer),之后告诉了「主厨」(Operator、Controller 的 Control Loop 控制循环)**,主厨发现「专用记录本上的这一页」后,便指导 k8s 提供的「基础厨师」(Pod),进行蛋糕的制作,其中还可以添加用户的「自定义做法,如裱花等」(就是 Pod 种镜像的启动命令之类的)

**删除:**在此过程中,客户可能不想要了,那么 k8s 会「撕毁」(delete)「专用记录本上对应的那一页」,k8s 的「主厨的助手在实时监控着」(Operator、Controller 的 Informer)发现了,通知「主厨」不用做了

更新:在此过程中,客户可能想修改蛋糕的「甜度」(API对象的某一属性),那么 k8s 会「修改」(delete)「专用记录本上对应的那一页」,k8s 的「主厨的助手在实时监控着」(Operator、Controller 的 Informer)发现了,通知「主厨」进行修改,最后形成一个「名称为戚风蛋糕编号1的戚风蛋糕」(资源对象)

Apiversion: 甜品组/测试版本 Kind:蛋糕 Metadata: name:戚风蛋糕编号1 Spec: 形状:圆形 甜度:70% # 修改了此处 做法:第一步XX,第二步XX 等等 配料:动物奶油、糖、炼乳

新增:在此过程中,客户可能想再要个「戚风蛋糕」(新增一个 API 对象),那么 k8s 会在「专用记录本上新增一页」(Add),k8s 的「主厨的助手在实时监控着」(Operator、Controller 的 Informer)发现了,通知「主厨」再做一个,最后又形成一个「名称为戚风蛋糕编号2的戚风蛋糕」(资源对象)

Apiversion: 甜品组/测试版本 Kind:蛋糕 Metadata: name:戚风蛋糕编号2 Spec: 形状:圆形 甜度:70% # 修改了此处 做法:第一步XX,第二步XX 等等 配料:动物奶油、糖、炼乳

总结

戚风蛋糕制作表单 —— CRD,Pod 的定义(API 的类型,资源的类型),也可以成为编程语言中的 类型(如 struct 的定义)

用户提交的制作表单 —— 用户提交的 yaml —— 在 k8s 形成 API 对象 (对 struct 的填充)

最后制作出的蛋糕 —— 资源对象 —— 可以理解为 实例 (最后形成的 struct 对象)

制作的过程 —— Controller、Operator

Pod 也是遵循上述流程,一般对于原生资源的控制 成为 Controller

Operator 有很多脚手架(kubuilder、Operator-sdk),就是对 Controller 进行封装,便于编写

Informer 架构介绍

(三)Kubernetes 源码剖析之学习Informer机制

Kubernetes Informer 与 Lister 详解

kubernetes shared Informer 源码解析

client-go系列之5—Informer

25 | 深入解析声明式API(二):编写自定义控制器

kubernetes 中 informer 的使用

如何高效掌控K8s资源变化?K8s Informer实现机制浅析

在这里插入图片描述

各个组件的作用

根据流程图来解释一下 Informer 中几个组件的作用:

Reflector:称之为反射器,实现对 apiserver 指定类型对象的监控(ListAndWatch),其中反射实现的就是把监控的结果实例化成具体的对象,最终也是调用 Kubernetes 的 List/Watch API;DeltaIFIFO Queue:一个增量队列,将 Reflector 监控变化的对象形成一个 FIFO 队列,此处的 Delta 就是变化;LocalStore:就是 informer 的 cache,这里面缓存的是 apiserver 中的对象(其中有一部分可能还在DeltaFIFO 中),此时使用者再查询对象的时候就直接从 cache 中查找,减少了 apiserver 的压力,LocalStore 只会被 Lister 的 List/Get 方法访问。WorkQueue:DeltaIFIFO 收到时间后会先将时间存储在自己的数据结构中,然后直接操作 Store 中存储的数据,更新完 store 后 DeltaIFIFO 会将该事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件会根据对应的类型触发对应的回调函数。 Informer 的工作流程(图中左侧)

只看上图左侧部分即可

Informer 主要有两个职责

同步资源状态到本地缓存,保证本地缓存的有效性 —— 对应图中 Reflector 包的 List & Watch 机制监听事件,并调用 ResourceEventHandler 中对应的处理函数(AddFunc、UpdateFunc、DeleteFunc)进行处理( 如执行某些自定义逻辑逻辑,如与 Prometheus 对接;或者放入到 workqueue 中,等待 Controller 处理)

注意:

ResourceEventHandler 的三个回调函数 (AddFunc、UpdateFunc、DeleteFunc) 处理后,放入到 workqueue 中的内容只是资源的 namespace 和 name 的组合(也就是 Local Store 中的 key),之后 Controller 处理时,时间上是利用此 key 从 Local Store 中取出具体的资源对象 Informer 首先会 list/watch apiserver,Informer 所使用的 Reflector 包负责与 apiserver 建立连接,Reflector 使用 ListAndWatch 的方法,会先从 apiserver 中 list 该资源的所有实例,list 会拿到该对象最新的 resourceVersion,然后使用 watch 方法监听该 resourceVersion 之后的所有变化,若中途出现异常,reflector 则会从断开的 resourceVersion 处重现尝试监听所有变化,一旦该对象的实例有创建、删除、更新动作,Reflector 都会收到”事件通知”,这时,该事件及它对应的 API 对象这个组合,被称为增量(Delta),它会被放进 DeltaFIFO 中。Informer 会不断地从这个 DeltaFIFO 中读取增量,每拿出一个对象,Informer 就会判断这个增量的事件类型,然后创建或更新本地的缓存,也就是 store。如果事件类型是 Added(添加对象),那么 Informer 会通过 Indexer 的库把这个增量里的 API 对象保存到本地的缓存中,并为它创建索引,若为删除操作,则在本地缓存中删除该对象。DeltaFIFO 再 pop 这个事件到 controller 中,controller 会调用事先注册的 ResourceEventHandler 回调函数进行处理。在 ResourceEventHandler 回调函数中,其实只是做了一些很简单的过滤,然后将关心变更的 Object 放到 workqueue 里面。Controller 从 workqueue 里面取出 Object,启动一个 worker 来执行自己的业务逻辑,业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后孜孜不倦地让 apiserver 将状态演化到用户希望达到的状态,比如为 deployment 创建新的 pods,或者是扩容/缩容 deployment。在worker中就可以使用 lister 来获取 resource,而不用频繁的访问 apiserver,因为 apiserver 中 resource 的变更都会反映到本地的 cache 中。此外,在这个过程中,每经过 resyncPeriod 指定的时间,Informer 维护的本地缓存,都会使用最近一次 LIST 返回的结果强制更新一次,从而保证缓存的有效性。在 Kubernetes 中,这个缓存强制更新的操作就叫作:resync。 Controller 的工作流程 https://time.geekbang.org/column/article/42076

见图中右侧部分

此处以一个封装 Neutron API 的 CRD 资源 Network 为例

Controller 从 Informer 获取的只是 API 对象,可以理解为一个表单 —— 就是「期望的状态」实际的状态 —— 要从集群中或调用 api 进行获取调谐 —— 之后进行操作,使「实际的状态」达到「期望的状态」

举个很简单的例子

首先我创建一个 deployment ,设置副本数为 2deployment Controller 的 Informer 监听到了 dployment 这个 API 对象,得知了「期望状态」要 2 个 poddeployment Controller 接下来调用 api 查看「当前的实际状态」,发现有 0 个 pod因此 deployment Controller 调用 pod 的 client,发送创建请求给 apiserver,进行 pod 的创建,创建 2 个 pod,满足要求 —— 调谐

上面过程只是简化 —— 其中还涉及到 replicaset 这种资源及对应的 Controller

每种资源都会有一个对应的控制器

这个执行周期里(processNextWorkItem),首先从工作队列里出队(workqueue.Get)了一个成员,也就是一个 Key(Network 对象的:namespace/name)。然后,在 syncHandler 方法中,使用这个 Key,尝试从 Informer 维护的缓存中拿到了它所对应的 Network 对象。 使用了 networksLister 来尝试获取这个 Key 对应的 Network 对象。这个操作,其实就是在访问本地缓存的索引 Indexer实际上,在 Kubernetes 的源码中,你会经常看到控制器从各种 Lister 里获取对象,比如:podLister、nodeLister 等等,它们使用的都是 Informer 和缓存机制。 如果控制循环从缓存中拿不到这个对象(即:networkLister 返回了 IsNotFound 错误),那就意味着这个 Network 对象的 Key 是通过前面的“删除”事件添加进工作队列的。所以,尽管队列里有这个 Key,但是对应的 Network 对象已经被删除了。 这时候,我就需要调用 Neutron 的 API,把这个 Key 对应的 Neutron 网络从真实的集群里删除掉。 而如果能够获取到对应的 Network 对象,我就可以执行控制器模式里的对比“期望状态”和“实际状态”的逻辑了。 自定义控制器“千辛万苦”拿到的这个 Network 对象,正是 APIServer 里保存的“期望状态”,即:用户通过 YAML 文件提交到 APIServer 里的信息。当然,在我们的例子里,它已经被 Informer 缓存在了本地。那么,“实际状态”又从哪里来呢? —— 当然是来自于实际的集群了。所以,我们的控制循环需要通过 Neutron API 来查询实际的网络情况。比如,我可以先通过 Neutron 来查询这个 Network 对象对应的真实网络是否存在 如果不存在,这就是一个典型的“期望状态”与“实际状态”不一致的情形。这时,我就需要使用这个 Network 对象里的信息(比如:CIDR 和 Gateway),调用 Neutron API 来创建真实的网络。如果存在,那么,我就要读取这个真实网络的信息,判断它是否跟 Network 对象里的信息一致,从而决定我是否要通过 Neutron 来更新这个已经存在的真实网络。 这样,我就通过对比“期望状态”和“实际状态”的差异,完成了一次调协(Reconcile)的过程。 结合代码理解 Informer 与 控制器的结合 k8s自定义controller三部曲之三:编写controller代码 Infomer 部分 首先初始化 Informer接下来注册 AddEventHandler 的回调函数 (AddFunc、UpdateFunc、DeleteFunc) 其作用就是,对 API 对象进行一些过滤处理,然后将关心变更的 object key(namespace 和 name)放入到 workqueue 中其实其对应的函数,不用对 cache 进行同步,Informer 内部具有 cache 同步机制(直接从 Delta FIFO 通过 Indexer 给 Local Store) // NewController returns a new student controller func NewController( kubeclientset kubernetes.Interface, studentclientset clientset.Interface, studentInformer informers.StudentInformer) *Controller { utilruntime.Must(studentscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ kubeclientset: kubeclientset, studentclientset: studentclientset, studentsLister: studentInformer.Lister(), studentsSynced: studentInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"), recorder: recorder, } glog.Info("Setting up event handlers") // Set up an event handler for when Student resources change // 正常来说,注册的 `AddEventHandler` 函数监听到资源的变化,会将其更新到本地的 cache,并传送到 workqueue 中 studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueStudent, // 下面有这个函数解释,就是先将对象同步到缓存中,再放入 workqueue 队列 UpdateFunc: func(old, new interface{}) { oldStudent := old.(*bolingcavalryv1.Student) newStudent := new.(*bolingcavalryv1.Student) if oldStudent.ResourceVersion == newStudent.ResourceVersion { //版本一致,就表示没有实际更新的操作,立即返回 return } controller.enqueueStudent(new) }, DeleteFunc: controller.enqueueStudentForDelete, }) return controller } // 数据先放入缓存,再入队列 func (c *Controller) enqueueStudent(obj interface{}) { var key string var err error // 将对象放入缓存 // 其实这一步骤可以省略 informer 会自动同步缓存 cache 的 if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { runtime.HandleError(err) return } // 将key放入队列 c.workqueue.AddRateLimited(key) } // 删除操作 func (c *Controller) enqueueStudentForDelete(obj interface{}) { var key string var err error // 从缓存中删除指定对象 key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) return } //再将key放入队列 c.workqueue.AddRateLimited(key) } Controller 部分 processNextWorkItem 是从 workqueue中取数据 keysyncHandler 是利用 key从cache获取期望状态,之后再获取实际状态(此代码没有实际编写),之后进行调谐处理,满足期望的状态

现在采用 Operator 脚手架进行编写时,只需要编写 Reconcile 函数

其实就是 syncHandler 函数(从缓存获取资源的「期望状态」,从环境中通过 client 获取资源的「实际状态」,之后利用 client 进行资源的调整「增删改」,完成「实际状态」到「期望状态」的转换)

脚手架相当于封装了

Informer 的 初始化Informer 的 AddEventHandler 的回调函数 (AddFunc、UpdateFunc、DeleteFunc)workqueue 的 逐个取出 key func (c *Controller) runWorker() { for c.processNextWorkItem() { } } // 取数据处理 func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } // We wrap this block in a func so we can defer c.workqueue.Done. err := func(obj interface{}) error { defer c.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { c.workqueue.Forget(obj) runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // 在syncHandler中处理业务 if err := c.syncHandler(key); err != nil { return fmt.Errorf("error syncing '%s': %s", key, err.Error()) } c.workqueue.Forget(obj) glog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { runtime.HandleError(err) return true } return true } // 处理 func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } // 从缓存中取对象 student, err := c.studentsLister.Students(namespace).Get(name) if err != nil { // 如果Student对象被删除了,就会走到这里,所以应该在这里加入执行 if errors.IsNotFound(err) { glog.Infof("Student对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name) return nil } runtime.HandleError(fmt.Errorf("failed to list student by: %s/%s", namespace, name)) return err } glog.Infof("这里是student对象的期望状态: %#v ...", student) glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)") c.recorder.Event(student, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) return nil }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有