关于Kubernetes的Python SDK,几乎只有官方项目的 examples 。 关于Job的基本增删改查操作,可以参考 job_crud.py 。 但是,这只是基本用法,缺乏一些实用细节。

本文给出Python SDK操作Kubernetes Job的更多示例代码,以及相关解释。

pip install kubernetes

初始化

from kubernetes.client import BatchV1Api
from kubernetes.config import load_kube_config
load_kube_config()
batch = BatchV1Api()

load_kube_config是从默认位置,也就是~/.kube/config加载配置。 如果在其它位置,可以通过第一个参数传入其路径。

BatchV1Api()可以当做Job的客户端来用。 命名上,Batch和Job是类似的概念,前者强调批量。

创建Job

以下来自官方样例job_crud.py

def create_job_object():
    # Configureate Pod template container
    container = client.V1Container(
        name="pi",
        image="perl",
        command=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"])
    # Create and configurate a spec section
    template = client.V1PodTemplateSpec(
        metadata=client.V1ObjectMeta(labels={"app": "pi"}),
        spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
    # Create the specification of deployment
    spec = client.V1JobSpec(
        template=template,
        backoff_limit=4)
    # Instantiate the job object
    job = client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=client.V1ObjectMeta(name=JOB_NAME),
        spec=spec)
    return job
def create_job(api_instance, job):
    api_response = api_instance.create_namespaced_job(
        body=job,
        namespace="default")
    print("Job created. status='%s'" % str(api_response.status))

虽然,根据官方教程这样的写法,也能得到可用的V1Job,拿去执行创建操作。 但还是过于陌生和偏门,不如主流、常见的YAML方便、易读写。

这里该出两种更方便的做法。

直接使用YAML

apiVersion: batch/v1 kind: Job metadata: name: hello spec: template: spec: containers: - name: echo image: alpine:3.11 args: - 'echo' - 'Hello world!'

以上是一个最精简的Job配置样例,

通过读取文件为dict,可以直接拿去使用。

from kubernetes.client import V1Job
import yaml
with open('job.yaml') as file:
    cfg = yaml.safe_load(file)
job = batch.create_namespaced_job(namespace='default', body=cfg)
assert isinstance(job, V1Job)

create_namespaced_job同样接受字典作为body输入,因此YAML配置可以读出后直接传入。

这里返回的V1Job只是创建时的状态,但是会包含更多集群中的信息。

使用dict

由于create_namespaced_job接受字典作为body输入,因此直接使用dict也是可行的。

cfg = {
    'apiVersion': 'batch/v1',
    'kind': 'Job',
    'metadata': {
        'name': 'hello'
    'spec': {
        'template': {
            'spec': {
                'restartPolicy':
                'Never',
                'containers': [{
                    'name': 'upload',
                    'image': 'alpine:3.11',
                    'args': ['echo', 'Hello world!']
batch.create_namespaced_job(namespace='default', body=cfg)

由于dict结构与YAML相同,而又没有类的束缚,所以也很灵活方便。

此外,从YAML读出为dict后,也可以通过修改部分字段,达到动态变化的效果。 这种结合YAML和dict的使用方式,是对官方用法的最佳替代。

监控Job运行

在创建Job后,通常需要监控Job的运行,做一些外围处理。 轮询当然是下下策,而Kubernetes提供了一个Watch机制,通过接收Event,实现对状态变化的掌控。 Event只有在状态变化时才会有,所以是非常理想的回调。

from kubernetes.client import V1Job
from kubernetes.watch import Watch
job_name = 'hello'
watcher = Watch()
for event in watcher.stream(
        batch.list_namespaced_job,
        namespace='default',
        label_selector=f'job-name={job_name}',
):  # yapf: disable
    assert isinstance(event, dict)
    job = event['object']
    assert isinstance(job, V1Job)
    # handle job.status

Watch().stream就是前面说的理想回调,它第一个参数是列出类的函数,这里选择list_namespaced_job。 后面的参数,都是list_namespaced_job的参数。 除了必备的namespace以外,label_selector也是一个常用参数,可以避免关注无关的Job。 每个Job在创建后,都会自动带一个f'job-name={job_name}'的Label,可以借此筛选。 job_name就是metadata里设置的name,如这里job-name=hello

event是一个dict,只有三个值。 其中event['raw_object']只是event['object']dict形式,没有太大意义。 event['type']常见三个值,对应增删改。

  • ADDED,创建时的信息,和create_namespaced_job的返回值通常没有区别。
  • MODIFIED,Job状态变化时的信息。
  • DELETED,Job删除时的信息。
  • 以上三个状态值,对其它类型的资源也是通用的,比如Pod、Deployment等。

    V1Job的使用

    对于具体的V1Job实例,其它字段都是和创建时的配置差不多的,只是多一些集群中的具体信息。 所以,常用的还是.status字段。

    >>> from kubernetes.client import V1JobStatus
    >>> isinstance(job.status, V1JobStatus)
    >>> print(job.status)
    {'active': None,
     'completion_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
     'conditions': [{'last_probe_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
       'last_transition_time': datetime.datetime(2020, 8, 10, 9, 49, 38, tzinfo=tzutc()),
       'message': None,
       'reason': None,
       'status': 'True',
       'type': 'Complete'}],
     'failed': None,
     'start_time': datetime.datetime(2020, 8, 10, 9, 49, 32, tzinfo=tzutc()),
     'succeeded': 1}
    

    直接使用job.status.succeeded,可以得到成功的Container数量。 下面几乎每一级都有特定的类,可以连续使用.操作符。 如果有特殊需要,也可以用job.to_dict()转换成字典来用。

    其它字段,作用基本上也和名称相关,不难推测。

    列出Job

    列出所有Job的list_job_for_all_namespaces不常用,一般只列出指定Namespace的Job。

    from kubernetes.client import V1JobList, V1Job
    job_list = batch.list_namespaced_job(namespace='default')
    assert isinstance(job_list, V1JobList)
    assert isinstance(job_list.items, list)
    for job in job_list.items:
        assert isinstance(job, V1Job)
    

    与监控的示例相比,这里去掉了label_selector,可以获取Namespace中所有的Job。 如果有需要,可以通过自定义Label把所有Job分类,并使用label_selector获取指定类型的Job。

    读取Job

    如果知道Job的name,可以直接通过read_*系列接口,获得指定的V1Job

    from kubernetes.client import V1Job
    job = batch.read_namespaced_job(name='hello', namespace='default')
    assert isinstance(job, V1Job)
    

    如果更看重状态,可以改用read_namespaced_job_status。 虽然访问的API不同,但在Python的V1Job这个结果层面,没有本质差异。

    列出一个Job的Pod

    Pod是Kubernetes调度的最小单元,也是最常用的一种资源。

    from typing import List
    from kubernetes.client import CoreV1Api, V1Pod
    def get_pods_by(job_name: str) -> List[V1Pod]:
        core = CoreV1Api()
        pods = core.list_namespaced_pod(
            namespace='default',
            label_selector=f'job-name={job_name}',
            limit=1,
        return pods.items
    

    这里的get_pods_by,可以用job_name获取对应的Pod。 limit=1是在已知Pod只有一个的情况下做出的优化,可按需调整或去掉。

    删除Job

    删除一个Job:

    from kubernetes.client import V1Status
    status = batch.delete_namespaced_job(
        namespace='default',
        name=job_name,
        propagation_policy='Background',
    assert isinstance(status, V1Status)
    

    其中,propagation_policy='Background'是不可省略的关键,否则默认是Orphan,其Pod不会被删除。 这属于API设计的一个失误,与kubectl的默认行为不符合。 而且,应该没有人在删除了Job之后,还要保留Pod的吧。 这里也可以选择'Foreground',阻塞等待相关资源的删除完毕。

    删除多个、或所有Job:

    status = batch.delete_collection_namespaced_job(
        namespace='default',
        propagation_policy='Background',
        label_selector='some-label=your-value',
    assert isinstance(status, V1Status)
    

    如果没有label_selector,那就是删除一个Namespace中的所有Job。

    更新Job

    这个比较少用,因为一般都是建新的。 用法其实和create_namespaced_job差不多,参考官方样例即可。

    def update_job(api_instance, job):
        # Update container image
        job.spec.template.spec.containers[0].image = "perl"
        api_response = api_instance.patch_namespaced_job(
            name=JOB_NAME,
            namespace="default",
            body=job)
        print("Job updated. status='%s'" % str(api_response.status))
    

    用Python操作Kubernetes的Job,总体上还是比较方便的,虽然有一些坑。