В последние годы Kubernetes (К8s) прочно закрепился в повседневной деятельности многих разработчиков и DevOps-инженеров. Однако большинство задач, которые приходится выполнять, однообразны, монотонны и легко поддаются автоматизации. 

Зачастую довольно просто набросать быстрый shell-скрипт с командами kubectl. Но для более сложных задач автоматизации требуется что-то более мощное, чем bash, например возможности языка программирования Python. 

В данной статье научимся работать с клиентской библиотекой Python для Kubernetes (kubernetes-client/python) и автоматизировать любые скучные задачи K8s, стоящие перед нами!

Создание экспериментального кластера 

Перед началом работы с kubernetes-client создадим экспериментальный кластер для тестирования. Воспользуемся KinD (Kubernetes в Docker), который можно установить по ссылке.

Потребуется следующая конфигурация кластера:

# kind.yaml
# https://kind.sigs.k8s.io/docs/user/configuration/
apiVersion: kind.x-k8s.io/v1alpha4
kind: Cluster
name: api-playground
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker

Для создания кластера с указанной конфигурацией выполняем: 

kind create cluster --image kindest/node:v1.23.5 --config=kind.yaml

kubectl cluster-info --context kind-api-playground
# Плоскость управления (Сontrol plane) Kubernetes работает по адресу https://127.0.0.1:36599
# CoreDNS работает по адресу https://127.0.0.1:36599/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

kubectl get nodes
# NAME STATUS ROLES AGE VERSION
# api-playground-control-plane Ready control-plane,master 58s v1.23.5
# api-playground-worker Ready <none> 27s v1.23.5
# api-playground-worker2 NotReady <none> 27s v1.23.5
# api-playground-worker3 NotReady <none> 27s v1.23.5

По факту готовности и запуска кластера устанавливаем клиентскую библиотеку (по желанию  —  внутри виртуальной среды):

python3 -m venv venv
source venv/bin/activate
pip install kubernetes

Аутентификация 

Перед выполнением любых действий в кластере Kubernetes необходимо пройти процесс аутентификации. 

Во избежание повторных процедур аутентификации воспользуемся долгосрочными токенами. Для их получения создаем ServiceAccount:

kubectl create sa playground
kubectl describe sa playground

Name: playground
Namespace: default
Labels: <none>
Annotations: <none>
Image pull secrets: <none>
Mountable secrets: playground-token-v8bq7
Tokens: playground-token-v8bq7
Events: <none>

export KIND_TOKEN=$(kubectl get secret playground-token-v8bq7 -o json | jq -r .data.token | base64 --decode)

Еще одно преимущество ServiceAccount состоит в том, что он не привязан к одному человеку. Это обстоятельство как нельзя лучше отвечает целям автоматизации. 

Полученный токен можно задействовать в запросах: 

curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis

Процедура аутентификации завершена. Теперь пройдем авторизацию, дающую право на выполнение действий. Для этого создаем роль Role и связываем ее с ServiceAccount. В результате у нас появляется возможность управлять ресурсами. Рассмотрим соответствующий код: 

kubectl create clusterrole manage-pods \
--verb=get --verb=list --verb=watch --verb=create --verb=update --verb=patch --verb=delete \
--resource=pods

kubectl -n default create rolebinding sa-manage-pods \
--clusterrole=manage-pods \
--serviceaccount=default:playground

Данный код разрешает ServiceAccount выполнять действия с подами, ограниченными пространством имен default

Роли должны быть точными и конкретными. При работе в KinD целесообразно выбрать роль администратора всего кластера cluster-admin, как показано ниже:  

kubectl create clusterrolebinding sa-cluster-admin \
--clusterrole=cluster-admin \
--serviceaccount=default:playground

Необработанные запросы 

Чтобы лучше понять внутренний механизм работы kubectl и kubernetes-client, начнем с необработанных HTTP-запросов и воспользуемся curl.

Есть самый простой способ узнать, какие запросы выполняются “под капотом”. Для этого необходимо выполнить нужную команду kubectl с -v 10. На выводе мы получаем полную команду curl, как показано ниже:  

kubectl get pods -v 10
# <snip>
curl -k -v -XGET -H "Accept: application/json;as=Table;v=v1;g=meta.k8s.io,application/json..." \
'https://127.0.0.1:36599/api/v1/namespaces/default/pods?limit=500'
# <snip>

Вывод с loglevel 10 будет очень подробным, но в нем вы найдете вышеуказанную команду curl

Добавляем заголовок токена Bearer в команду curl с долгосрочным токеном. Вы сможете выполнять те же действия, что и kubectl. Например: 

curl -s -k -XGET -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
-H "kubernetes/$Format" 'https://127.0.0.1:36599/api/v1/namespaces/default/pods/example' | jq .status.phase
# "Running"

В случае потребности в теле запроса посмотрите, какие поля нужно включить в запрос. Например, при создании пода можно использовать API, описанный по данной ссылке. Сделав это, получаем следующий запрос: 

curl -k -XPOST -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
-H "kubernetes/$Format" https://127.0.0.1:36599/api/v1/namespaces/default/pods [email protected]

# Подтверждение
kubectl get pods
NAME READY STATUS RESTARTS AGE
example 0/1 Running 0 7s

За атрибутами объекта обратимся к справочной документации по API Kubernetes. Кроме того, можно посмотреть определение OpenAPI с помощью этой команды:

curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis

Прямое взаимодействие с Kubernetes посредством REST API может оказаться неудобным, но в ряде ситуаций такой шаг оправдан. Так, некоторые случаи включают взаимодействие с API, у которых нет эквивалентной команды kubectl при работе с другим дистрибутивом K8s, например OpenShift. Этот дистрибутив предоставляет дополнительные API, не предусмотренные kubectl или клиентским SDK.

Клиентская библиотека Python

Переходим к самой клиентской библиотеке Python. Выполняем те же шаги, что и в случае с kubectl и curl. Начнем с аутентификации:

from kubernetes import client
import os

configuration = client.Configuration()
configuration.api_key_prefix["authorization"] = "Bearer"
configuration.host = "https://127.0.0.1:36599"
configuration.api_key["authorization"] = os.getenv("KIND_TOKEN", None)
configuration.verify_ssl = False # Только для тестирования с KinD!
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

ret = v1.list_namespaced_pod(namespace="default", watch=False)
for pod in ret.items:
print(f"Name: {pod.metadata.name}, Namespace: {pod.metadata.namespace} IP: {pod.status.pod_ip}")
# Name: example, Namespace: default IP: 10.244.2.2

Сначала определяем объект конфигурации, который сообщает клиенту о процедуре аутентификации с использованием токена Bearer. С учетом того, что кластер KinD не применяет SSL, мы его отключаем в реальном кластере. Но вам так делать не надо. 

Проверяем конфигурацию с помощью метода list_namespaced_pod клиента API и тем самым получаем все поды в пространстве имен default. Затем выводим их name, namespace и IP.

Предлагаю поработать с более реалистичной задачей. Для этого создадим Deployment:

deployment_name = "my-deploy"
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": deployment_name, "namespace": "default"},
"spec": {"replicas": 3,
"selector": {
"matchLabels": {
"app": "nginx"
}},
"template": {"metadata": {"labels": {"app": "nginx"}},
"spec": {"containers": [
{"name": "nginx", "image": "nginx:1.21.6", "ports": [{"containerPort": 80}]}]
}
},
}
}

import time
from kubernetes.client.rest import ApiException

v1 = client.AppsV1Api(api_client)

response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")
while True:
try:
response = v1.read_namespaced_deployment_status(name=deployment_name, namespace="default")
if response.status.available_replicas != 3:
print("Waiting for Deployment to become ready...")
time.sleep(5)
else:
break
except ApiException as e:
print(f"Exception when calling AppsV1Api -> read_namespaced_deployment_status: {e}\n")

Помимо создания Deployment мы также ожидаем получения доступа ко всем его подам. С этой целью запрашиваем состояние Deployment и проверяем количество доступных реплик.

Обратите внимание на структуру имен функций, например create_namespaced_deployment. Поясним на дополнительных примерах:

  • replace_namespaced_cron_job;
  • patch_namespaced_stateful_set;
  • list_namespaced_horizontal_pod_autoscaler;
  • read_namespaced_daemon_set;
  • read_custom_resource_definition.

Все они представлены в формате operation_namespaced_resource или просто operation_resource для глобальных ресурсов. В конце к ним можно добавить _status или _scale для методов, которые выполняют операции с состоянием (read_namespaced_deployment_status) или масштабированием ресурсов (patch_namespaced_stateful_set_scale). 

Отметим еще один важный момент в вышеуказанном примере. Мы выполняли действия с помощью client.AppsV1Api. Он позволяет работать со всеми ресурсами, принадлежащими apiVersion: apps/v1. Потребуйся нам CronJob, мы бы выбрали BatchV1Api (он же apiVersion: batch/v1 в формате YAML), а для PVC предпочли бы CoreV1Api из-за apiVersion: v1. Суть понятна.

Как видно, предоставляется большой выбор функций. К счастью, все они перечислены в документации. Нажав на любую из них, вы получаете пример ее использования. 

Помимо основных операций CRUD, можно постоянно следить за изменениями объектов, например за Events:

from kubernetes import client, watch

v1 = client.CoreV1Api(api_client)
count = 10
w = watch.Watch()
for event in w.stream(partial(v1.list_namespaced_event, namespace="default"), timeout_seconds=10):
print(f"Event - Message: {event['object']['message']} at {event['object']['metadata']['creationTimestamp']}")
count -= 1
if not count:
w.stop()
print("Finished namespace stream.")

# Event - Сообщение: Успешно назначен default/my-deploy-cb69f686c-2dspd для api-playground-worker2 2022-04-19T11:18:25Z
# Event - Сообщение: Образ контейнера "nginx:1.21.6" уже присутствует в компьютере 2022-04-19T11:18:26Z
# Event - Сообщение: Контейнер nginx создан 2022-04-19T11:18:26Z
# Event - Сообщение: Контейнер nginx запущен 2022-04-19T11:18:26Z

Здесь мы отслеживаем события в пространстве имен default: берем 10 событий и затем закрываем поток. В случае необходимости постоянного мониторинга ресурсов просто убираем timeout_seconds и вызов w.stop()

Как видно, в первом примере мы воспользовались простым dict Python для определения объекта Deployment, переданного клиенту. Как вариант, можно прибегнуть к более выраженному стилю ООП и применить модели API (классы), предоставляемые библиотекой:  

v1 = client.AppsV1Api(api_client)

deployment_manifest = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=client.V1ObjectMeta(name=deployment_name),
spec=client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(match_labels={
"app": "nginx"
}),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
spec=client.V1PodSpec(
containers=[client.V1Container(name="nginx",
image="nginx:1.21.6",
ports=[client.V1ContainerPort(container_port=80)]
)]))
)
)

response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")

Отбросьте попытки выяснить, какая модель применяется для каждого аргумента  —  это бесполезно. Генерируя ресурсы вышеуказанным способом, вы всегда должны работать с документацией по моделям и переходить по ссылкам при создании отдельных подобъектов. Так вы поймете, какие значения/типы ожидаются в каждом поле. 

Полезные примеры 

Теперь у вас есть базовое представление о работе клиентской библиотеки Python. Рассмотрим несколько примеров и фрагментов кода, позволяющих автоматизировать рутинные операции Kubernetes. 

Довольно часто мы выполняем непрерывный перезапуск развертывания с помощью команды kubectl rollout restart, но API для этого нет. kubectl проводит данную операцию, обновляя аннотации развертывания. Если быть более точным, то kubectl.kubernetes.io/restartedAt устанавливается на текущее время. Такой прием срабатывает, поскольку любое изменение в спецификации пода приводит к перезапуску.  

Рассмотрим код для выполнения перезапуска с помощью клиентской библиотеки Python: 

from kubernetes import dynamic
from kubernetes.client import api_client # Внимание: другой импорт, отличается от предыдущей клиентской библиотеки!
import datetime

client = dynamic.DynamicClient(api_client.ApiClient(configuration=configuration))

api = client.resources.get(api_version="apps/v1", kind="Deployment")

# Несмотря на то, что манифест развертывания ранее был создан с помощью модели класса, он по-прежнему ведет себя как словарь:
deployment_manifest["spec"]["template"]["metadata"]["annotations"] = {
"kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow().isoformat()
}

deployment_patched = api.patch(body=deployment_manifest, name=deployment_name, namespace="default")

Еще одна распространенная операция  —  масштабирование развертывания. К счастью, на этот случай есть функция API. Рассмотрим ее применение: 

from kubernetes import client

api_client = client.ApiClient(configuration)
apps_v1 = client.AppsV1Api(api_client)

# Тело может включать разные типы patch - https://github.com/kubernetes-client/python/issues/1206#issuecomment-668118057
api_response = apps_v1.patch_namespaced_deployment_scale(deployment_name, "default", {"spec": {"replicas": 5}})

Для устранения неполадок целесообразно выполнить команду exec внутри пода, изучить и, возможно, получить переменную среды, чтобы проверить правильность конфигурации. Пример соответствующего кода:

from kubernetes.stream import stream

def pod_exec(name, namespace, command, api_instance):
exec_command = ["/bin/sh", "-c", command]

resp = stream(api_instance.connect_get_namespaced_pod_exec,
name,
namespace,
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False,
_preload_content=False)

while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print(f"STDOUT: \n{resp.read_stdout()}")
if resp.peek_stderr():
print(f"STDERR: \n{resp.read_stderr()}")

resp.close()

if resp.returncode != 0:
raise Exception("Script failed")

pod = "example"
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

pod_exec(pod, "default", "env", v1)

# STDOUT:
# KUBERNETES_SERVICE_PORT=443
# KUBERNETES_PORT=tcp://10.96.0.1:443
# HOSTNAME=example
# HOME=/root
# ...

Данный фрагмент кода также позволяет при необходимости запускать полные shell-скрипты. 

Допустим, нужно установить ограничение Taint на проблемный узел. Можно сконцентрироваться на задачах, ориентированных на администрирование кластера. Поскольку прямого API для ограничений узлов (Node Taints) нет, найдем обходной способ. Вот код, который поможет:

from kubernetes import client

api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

# kubectl taint nodes api-playground-worker some-taint=1:NoSchedule
v1.patch_node("api-playground-worker", {"spec": {"taints": [{"effect": "NoSchedule", "key": "some-taint", "value": "1"}]}})

# kubectl get nodes -o custom-columns=NAME:.metadata.name,TAINTS:.spec.taints --no-headers
# api-playground-control-plane [map[effect:NoSchedule key:node-role.kubernetes.io/master]]
# api-playground-worker [map[effect:NoSchedule key:some-taint value:1]]
# api-playground-worker2 <none>
# api-playground-worker3 <none>

Для автоматизации масштабирования кластера проводится контроль за использованием его ресурсов. Как правило, команда kubectl top позволяет получить информацию в интерактивном режиме. С помощью же клиентской библиотеки мы можем сделать так: 

# https://github.com/kubernetes-sigs/kind/issues/398
# kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.5.0/components.yaml
# kubectl patch -n kube-system deployment metrics-server --type=json \
# -p '[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--kubelet-insecure-tls"}]'

from kubernetes import client

api_client = client.ApiClient(configuration)
custom_api = client.CustomObjectsApi(api_client)

response = custom_api.list_cluster_custom_object("metrics.k8s.io", "v1beta1", "nodes") # also works with "pods" instead of "nodes"

for node in response["items"]:
print(f"{node['metadata']['name']: <30} CPU: {node['usage']['cpu']: <10} Memory: {node['usage']['memory']}")

# api-playground-control-plane CPU: 148318488n Memory: 2363504Ki
# api-playground-worker CPU: 91635913n Memory: 1858680Ki
# api-playground-worker2 CPU: 75473747n Memory: 1880860Ki
# api-playground-worker3 CPU: 105692650n Memory: 1881560Ki

Рассмотренный пример предусматривает предварительную установку metrics-server в кластере. Проверить его наличие можно командой kubectl top. Воспользуйтесь комментариями в коде для его установки, если вы работаете с KinD.

Изучим последний, но не менее востребованный случай. Допустим, у вас есть набор файлов YAML и JSON. Вы планируете задействовать их либо для развертывания или изменения объектов в кластере, либо для экспорта или резервных копий объектов, созданных с помощью клиентской библиотеки. Рассмотрим простой способ преобразования файлов YAML/JSON в объекты Kubernetes и обратно в файлы: 

# pip install kopf  # (Python 3.7+)
import kopf

api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

pods = []

# https://stackoverflow.com/questions/59977058/clone-kubernetes-objects-programmatically-using-the-python-api/59977059#59977059
ret = v1.list_namespaced_pod(namespace="default")
for pod in ret.items:
# Простое преобразование в Dict/JSON
print(api_client.sanitize_for_serialization(pod))

# Преобразование с очисткой полей
pods.append(kopf.AnnotationsDiffBaseStorage()
.build(body=kopf.Body(api_client.sanitize_for_serialization(pod))))


# Преобразование из Dict обратно в объект Client
class FakeKubeResponse:
def __init__(self, obj):
import json
self.data = json.dumps(obj)


for pod in pods:
pod_manifest = api_client.deserialize(FakeKubeResponse(pod), "V1Pod")
...

Первый способ преобразования существующего объекта в словарь Python (JSON) предусматривает применение sanitize_for_serialization. Он выдает необработанный вывод со всеми сгенерированными/предустановленными полями. Второй и более предпочтительный способ обращается к вспомогательным методам библиотеки kopf, которые удаляют все ненужные поля. Таким образом, упрощается процесс преобразования словаря в правильный файл YAML или JSON.

Если необходимо осуществить обратный процесс и перейти от словаря к клиентской объектной модели, можно воспользоваться методом deserialize клиента API. Однако он требует наличия атрибута data у своего аргумента. Поэтому мы передаем ему экземпляр класса контейнера с таким атрибутом. 

Если у вас уже есть файлы YAML для работы с клиентской библиотекой Python, воспользуйтесь вспомогательной функцией kubernetes.utils.create_from_yaml.

Чтобы получить полное представление обо всех возможностях библиотеки, ознакомьтесь с каталогом примеров в репозитории. 

Настоятельно рекомендую изучить ишью (англ. issue) в репозитории библиотеки. Здесь вы найдете множество отличных практических примеров, например параллельная обработка событий и отслеживание обновлений ConfigMaps

Заключение 

Клиентская библиотека Python содержит сотни функций. В связи с этим не представляется возможным рассказать обо всех функциональностях и случаях использования в рамках одной статьи. Большинство из них придерживаются общей структуры, благодаря чему привыкнуть к работе с библиотекой можно буквально за пару минут. 

Если ваш интерес не ограничивается рассмотренными случаями, рекомендую изучить другие инструменты, которые задействуют kubernetes-client/python, например библиотеку для создания операторов Kubernetes. Посмотрите тесты самой библиотеки, поскольку они отражают ее предполагаемое применение. Обратите внимание на этот отличный набор клиентских тестов.

Читайте также:

Читайте нас в TelegramVK и Яндекс.Дзен


Перевод статьи Martin Heinz: Automate All the Boring Kubernetes Operations With Python

Предыдущая статьяNexus  —  новый визуализатор дерева компонентов для Next.js
Следующая статьяДвижки JavaScript. Часть 1: парсинг