如何在系统退出时优雅地停止Kubernetes服务监视



我运行了以下KOPF守护程序:

import kopf
import kubernetes
@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
config = kubernetes.client.Configuration()
client = kubernetes.client.ApiClient(config) 
workload = kubernetes.client.CoreV1Api(client)
watch = kubernetes.watch.Watch()
while not stopped:
for e in watch.stream(workload.list_service_for_all_namespaces):
svc = e['object']
lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
if "NodePort" in svc.spec.type:
logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
do_some_work(svc)
watch.stop()

当系统通过Ctrl + C或Kubernetes杀死pod的方式退出时,我得到以下警告:

INFO:kopf.reactor.running:Signal SIGINT is received. Operator is stopping.
[2020-12-11 15:07:52,107] kopf.reactor.running [INFO    ] Signal SIGINT is received. Operator is stopping.
WARNING:kopf.objects:Daemon 'worker_services' did not exit in time. Leaving it orphaned.
[2020-12-11 15:07:52,113] kopf.objects         [WARNING ] Daemon 'worker_services' did not exit in time. Leaving it orphaned.

即使我按下Ctrl + Z,这也会使进程在后台运行。

我相信for loop用流阻塞了进程,并且在系统退出时不会终止,因此它不会在这个代码段的最后一行碰到watch.stop()

到目前为止,我已经尝试了以下内容:

  • do_some_work(svc)之后添加watch.stop(),但这会使我的程序进入一个非常激进的循环,占用高达90%的CPU
  • 将整个for loop放在不同的线程上,这使得一些组件失败,例如记录器
  • 实现了yield e以使进程不阻塞,这使得守护进程在它监视的第一个服务和监视结束后完成
  • 使用signal库实现了Signal Listeners,以侦听出口函数中的SIGINTwatch.stop(),但从未调用该函数
  • 用上述一些解决方案实现了cancellation_timeout=3.0,即@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters', cancellation_timeout=3.0),但也没有成功

如有任何意见,我们将不胜感激,并提前表示感谢。

我在您的示例中看到的是,代码试图监视集群中的资源。但是,它使用的是同步的官方客户端库。在Python中,同步函数(或线程(不能被中断,这与异步(也需要使用异步i/o(不同。一旦调用了这里显示的函数,它就永远不会退出,也不会在长时间运行期间检查停止的标志。

您可以对当前代码做的是更频繁地检查stopped标志:

@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
…
watch = kubernetes.watch.Watch()
for e in watch.stream(workload.list_service_for_all_namespaces):
if stopped:  # <<<< check inside of the for-loop
break
svc = …
………
watch.stop()

这将检查守护程序是否在每个服务的每个事件上都停止。但是,如果完全静音(发生这种情况(,它将不会检查停止标志。

为了解决这个问题,你可以按时间限制手表(请查看客户的文档,了解如何正确完成,但iirc,这样做(:

watch = kubernetes.watch.Watch()
for e in watch.stream(workload.list_service_for_all_namespaces, timeout_seconds=123):

这将把守护进程的无响应/取消时间限制为最多123秒——以防集群中没有可用的服务或它们没有更改。

在这种情况下,您不需要在For循环之外检查stopped条件,因为守护进程函数将退出并打算重新启动,stopped将由框架检查,并且不会按预期重新启动函数。


顺便说一句,我应该注意到,监视处理程序内部的资源可能不是最好的主意。观看很复杂。太复杂了,所有的边缘案件和它带来的问题。

由于该框架已经进行了监视,因此可能更容易利用它,并通过运营商的全局状态实现跨资源连接:

import queue
import kopf
SERVICE_QUEUES = {}  # {(mc_namespace, mc_name) -> queue.Queue}
KNOWN_SERVICES = {}  # {(svc_namespace, svc_name) -> svc_body}

@kopf.on.event('v1', 'services')
def service_is_seen(type, body, meta, event, **_):
for q in SERVICE_QUEUES.values():  # right, to all MyClusters known to the moment
q.put(event)
if type == 'DELETED' or meta.get('deletionTimestamp'):
if (namespace, name) in KNOWN_SERVICES:
del KNOWN_SERVICES[(namespace, name)]
else:
KNOWN_SERVICES[(namespace, name)] = body

@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
# Start getting the updates as soon as possible, to not miss anything while handling the "known" services.
q = SERVICE_QUEUES[(namespace, name)] = queue.Queue()
try:
# Process the Services known before the daemon start/restart.
for (svc_namespace, svc_name), svc in KNOWN_SERVICES.items():
if not stopped:
lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
if "NodePort" in svc.spec['type']:
logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
do_some_work(svc)
# Process the Services arriving after the daemon start/restart.
while not stopped:
try:
svc_event = q.get(timeout=1.0)
except queue.Empty:
pass
else:
svc = svc_event['object']
lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
if "NodePort" in svc.spec['type']:
logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
do_some_work(svc)
finally:
del SERVICE_QUEUES[(namespace, name)]

这是一个简化的例子(但可能"照原样"工作——我没有检查(——只是展示了如何在使用框架功能的同时使资源相互交流的想法。

该解决方案取决于用例,并且该解决方案可能不适用于您想要的用例。也许我错过了为什么要这样做的一些东西。如果您将您的用例作为功能请求报告给Kopf的repo,以便稍后得到框架的支持,那将是一件好事。

最新更新