云原生工作流是一种基于云原生理念构建的、在云环境中运行的工作流系统。云原生工作流利用云计算的弹性、可伸缩性和容器化等特性,使工作流程更加灵活、可靠和易于管理。下面是关于云原生工作流的一些基本信息:

云原生工作流的特点:

  1. 容器化: 云原生工作流通常采用容器技术,将工作流中的各个组件打包成独立的容器,以便在不同环境中运行。
  2. 弹性伸缩: 云原生工作流能够根据负载的变化自动伸缩,以保持系统的高可用性和性能。
  3. 微服务架构: 采用微服务的架构,将工作流拆分成小的、独立的服务,每个服务负责一个特定的功能。
  4. 自动化: 云原生工作流强调自动化,通过自动执行、监控和调整工作流程,提高效率和减少人工干预。

云原生工作流的实现:

  1. Kubernetes: Kubernetes是一个开源的容器编排平台,被广泛用于云原生应用的部署和管理。云原生工作流可以直接利用Kubernetes的特性,例如Pod、Service、ReplicaSet等,来实现容器化和弹性伸缩。
  2. Argo Workflow: Argo Workflow是一个基于Kubernetes的开源工作流引擎,专门用于管理和执行容器化工作流。它支持定义复杂的工作流,包括并行和串行的任务,适用于各种场景。
  3. Tekton: Tekton是一个云原生的持续交付(CI/CD)框架,也可以用于构建云原生工作流。它提供了定义、运行和监控容器化工作流的能力,可以与Kubernetes和其他CI/CD工具集成。
  4. Apache Airflow: Apache Airflow是一个流行的开源工作流调度系统,支持定义、调度和监控工作流。它可以在云环境中运行,并且具有扩展性和灵活性。
  5. Serverless 架构: 通过采用无服务器(Serverless)架构,可以进一步简化工作流的管理和扩展。服务如AWS Step Functions、Azure Logic Apps和Google Cloud Composer等提供了云原生的无服务器工作流解决方案。

实现云原生工作流的关键在于选择合适的工作流引擎,结合云原生基础设施和服务,以满足应用程序的需求,并充分发挥云计算的优势。

# k8s上部署工作流原理

前面,我们看到的云原生的诸多优点。下面我们来了解下Kubernetes这个最出名的容器编排平台,以及如何基于Kubernetes实现容器化部署。

Kubernetes(简称 K8s)是一个开源的容器编排平台,用于自动化容器应用程序的部署、扩展和管理。它最初是由 Google 设计和开发的,现在由 Cloud Native Computing Foundation(CNCF)维护。Kubernetes 提供了一种跨集群的容器管理解决方案,让您能够在公有云、私有云或混合云环境中运行、管理和扩展容器化应用程序。

# Kubernetes介绍

Kubernetes 的主要特性和优势包括:

  1. 自动化部署和滚动更新:Kubernetes 可以根据预定义的配置自动部署应用程序,并在更新时进行滚动更新,以确保零停机时间和持续可用性。
  2. 自动扩展:Kubernetes 可以根据资源使用情况(如 CPU 和内存)自动扩展或收缩应用程序实例数量,以满足不断变化的负载需求。
  3. 负载均衡和服务发现:Kubernetes 提供了内置的负载均衡器和服务发现机制,允许容器之间以及外部客户端访问容器应用程序。
  4. 自我修复:Kubernetes 可以检测到容器故障并自动重新启动容器,确保应用程序始终保持运行状态。
  5. 存储编排:Kubernetes 允许自动挂载所需的存储系统(例如本地存储、公有云提供商的存储服务等),以满足应用程序的存储需求。
  6. 安全和访问控制:Kubernetes 提供了多种安全特性,如网络策略、TLS、身份验证和授权,以确保应用程序和集群的安全。
  7. 多云和混合云支持:Kubernetes 可以在各种环境中运行,包括公有云、私有云、混合云和本地数据中心,实现真正的混合云部署。
  8. 声明式配置:Kubernetes 使用声明式配置,允许描述应用程序的期望状态,而 Kubernetes 则负责实现和维护这个状态。

Kubernetes 提供了一种强大、灵活且可扩展的容器管理平台,使企业能够更好地利用容器技术,提高应用程序的可靠性、可用性和安全性,降低运维成本。

# k8s容器化部署实践

假设我们要将一个Golang实现的API服务进行容器化部署。

假设项目目录结构如下:

main.go

main.go

package main

import (
  "net/http"

  "github.com/gin-gonic/gin"
)

func main() {
  r := gin.Default()
  r.GET("/ping", func(c *gin.Context) {
    c.JSON(http.StatusOK, gin.H{
      "message": "pong",
    })
  })
  r.Run() // listen and serve on 0.0.0.0:8080 
}

创建Docker镜像

创建 Docker 镜像 首先,需要为 Golang API 服务创建一个 Docker 镜像。创建一个名为 Dockerfile 的文件,将其放在您的项目根目录中,并添加以下内容:

# 使用官方 Golang 镜像作为基础镜像
FROM golang:1.20

# 将工作目录设置为 /app
WORKDIR /app
  
# 设置 Go 环境变量 GO111MODULE 为 on,启用 Go Modules 功能。
RUN go env -w GO111MODULE=on

# 设置 Go 环境变量 GOPROXY,使用中国镜像站点 goproxy.cn 作为 Go Modules 的代理。
RUN go env -w GOPROXY=https://goproxy.cn,direct

# 设置镜像的维护者信息。
MAINTAINER "test"

# 复制项目源代码到工作目录
COPY . .

# 初始化一个新的 Go Modules 项目
RUN go mod init api-server

# 下载并安装项目所需的依赖项
RUN go mod tidy

# 安装并升级 Golang Gin Web 框架。
RUN go get -u github.com/gin-gonic/gin

# 构建 Golang API 服务
RUN go build main.go

# 暴露端口(请根据您的实际端口进行修改)
EXPOSE 8080

# 运行 Golang API 服务
CMD ["./main"]

构建Docker镜像

构建 Docker 镜像 在项目根目录中运行以下命令,将您的 Golang API 服务打包成一个 Docker 镜像:

docker build -t your-dockerhub-username/golang-api-service:v1.0 .

推送到镜像仓库

docker login
docker push your-dockerhub-username/golang-api-service:v1.0

创建yaml部署文件

创建 Kubernetes 部署和服务 接下来,您需要创建一个 Kubernetes 部署和服务来运行和暴露您的 Golang API 服务。创建一个名为 deployment.yaml 的文件,并添加以下内容:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: golang-api-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: golang-api-service
  template:
    metadata:
      labels:
        app: golang-api-service
    spec:
      containers:
      - name: golang-api-service
        image: your-dockerhub-username/golang-api-service:v1.0
        ports:
        - containerPort: 8080

---

apiVersion: v1
kind: Service
metadata:
  name: golang-api-service
spec:
  selector:
    app: golang-api-service
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
  type: LoadBalancer

部署到Kubernetes集群

部署到 Kubernetes 集群 确保您已连接到正确的 Kubernetes 集群,并运行以下命令:

kubectl apply -f deployment.yaml

查看部署状态

使用以下命令查看部署状态:

kubectl get pods
NAME                                  READY   STATUS      RESTARTS   AGE
golang-api-service-6df899b897-kvkcg   1/1     Running     0          5s
golang-api-service-6df899b897-l9qnl   1/1     Running     0          5s
golang-api-service-6df899b897-nk9r2   1/1     Running     0          5s
kubectl get svc
NAME                 TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)        AGE
golang-api-service   LoadBalancer   10.107.173.31   localhost     80:32636/TCP   20s

当 Golang API 服务成功部署到 Kubernetes 集群后,您可以通过服务的 IP 地址和端口访问它。如果您使用的是 LoadBalancer 服务类型,您还可以通过负载均衡器提供的公共 IP 地址访问您的 API 服务。

curl http://localhost/ping
{"message":"pong"}

# k8s上运行工作流任务原理

# 概览

如下图,当 DAG 提交任务时,KubernetesExecutor 从 Kubernetes API 请求工作单元 pod。然后,worker pod 运行任务、报告结果并终止。下面,我们尝试分析Airflow中Kubernetes创建和销毁Pod的流程原理,并在下一环节中手动实现这一过程。

下面我们通过分析Airflow的KubernetesPodOperator了解其实现原理,如下是KubernetesPodOperator的使用方法。

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k = KubernetesPodOperator(
    name="hello-dry-run",
    image="debian",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    task_id="dry_run_demo",
    do_xcom_push=True,
)

k.dry_run()

# 执行入口

KubernetesPodOperator的入口函数是execute方法:

源码位置:airflow/airflow/providers/cncf/kubernetes/operators/pod.py

该方法会调用execute_sync同步执行或execute_async异步执行来创建Pod,这里我们分析execute_sync同步执行的代码

def execute(self, context: Context):
        """Based on the deferrable parameter runs the pod asynchronously or synchronously."""
        if self.deferrable:
            self.execute_async(context)
        else:
            return self.execute_sync(context) 

如下是execute_sync源码:

def execute_sync(self, context: Context):
    result = None
    try:
        # 函数会检查是否已经有一个Pod请求对象,如果没有,就会创建一个。
        if self.pod_request_obj is None:
            self.pod_request_obj = self.build_pod_request_obj(context)
        # 函数会检查是否已经有一个Pod对象,如果没有,就会获取或创建一个。
        if self.pod is None:
            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
                pod_request_obj=self.pod_request_obj,
                context=context,
            )
        # 函数会将Pod的名称和命名空间推送到xcom,以便在出现错误时仍然可以获取这些值
        ti = context["ti"]
        ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
        ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)

        # 函数会获取远程Pod,以便在清理方法中使用
        self.remote_pod = self.find_pod(self.pod.metadata.namespace, context=context)
        
        # 如果定义了回调函数,函数会在Pod创建和启动时调用相应的回调函数
        if self.callbacks:
            self.callbacks.on_pod_creation(
                pod=self.remote_pod, client=self.client, mode=ExecutionMode.SYNC
            )
        self.await_pod_start(pod=self.pod)
        if self.callbacks:
            self.callbacks.on_pod_starting(
                pod=self.find_pod(self.pod.metadata.namespace, context=context),
                client=self.client,
                mode=ExecutionMode.SYNC,
            )

        # 如果需要获取日志,函数会获取Pod的日志。
        if self.get_logs:
            self.pod_manager.fetch_requested_container_logs(
                pod=self.pod,
                containers=self.container_logs,
                follow_logs=True,
            )
        # 如果不需要获取日志,或者基础容器的名称不在容器日志中,函数会等待容器完成
        if not self.get_logs or (
            self.container_logs is not True and self.base_container_name not in self.container_logs
        ):
            self.pod_manager.await_container_completion(
                pod=self.pod, container_name=self.base_container_name
            )
        # 如果定义了回调函数,函数会在Pod完成时调用相应的回调函数
        if self.callbacks:
            self.callbacks.on_pod_completion(
                pod=self.find_pod(self.pod.metadata.namespace, context=context),
                client=self.client,
                mode=ExecutionMode.SYNC,
            )

        if self.do_xcom_push:
            self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
            result = self.extract_xcom(pod=self.pod)
        istio_enabled = self.is_istio_enabled(self.pod)
        self.remote_pod = self.pod_manager.await_pod_completion(
            self.pod, istio_enabled, self.base_container_name
        )
    finally:
        # 最后,函数会清理Pod,并在清理完成后调用相应的回调函数
        pod_to_clean = self.pod or self.pod_request_obj
        self.cleanup(
            pod=pod_to_clean,
            remote_pod=self.remote_pod,
        )
        if self.callbacks:
            self.callbacks.on_pod_cleanup(pod=pod_to_clean, client=self.client, mode=ExecutionMode.SYNC)

    if self.do_xcom_push:
        return result

这段代码里,有几个关键的被调用函数:

  • get_or_create_pod:创建Pod
  • fetch_requested_container_logs:获取Pod执行日志
  • cleanup:销毁Pod

# 创建Pod

我们先分析创建Pod的函数调用过程:

源码位置:airflow/airflow/providers/cncf/kubernetes/operators/pod.py

def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s.V1Pod:
    if self.reattach_on_restart:
        pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
        if pod:
            return pod

    self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
    self.pod_manager.create_pod(pod=pod_request_obj)

    return pod_request_obj

该方法通过调用pod_manager对象的create_pod方法创建Pod。

源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py

在PodManager类中,我们可以看到各种调用Kubernetes接口的python SDK的代码。

run_pod_async方法通过调用Kubernetes的python SDK中的create_namespaced_pod函数实现创建Pod。

def create_pod(self, pod: V1Pod) -> V1Pod:
    """Launch the pod asynchronously."""
    return self.run_pod_async(pod)
  
  
def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
    """Run POD asynchronously."""
    sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
    json_pod = json.dumps(sanitized_pod, indent=2)

    self.log.debug("Pod Creation Request: \n%s", json_pod)
    try:
        resp = self._client.create_namespaced_pod(
            body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
        )
        self.log.debug("Pod Creation Response: %s", resp)
    except Exception as e:
        self.log.exception(
            "Exception when attempting to create Namespaced Pod: %s", str(json_pod).replace("\n", " ")
        )
        raise e
    return resp
  

# 获取执行日志

execute_sync调用pod_manager的fetch_requested_container_logs方法。

源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py

该方法的主要功能是获取Kubernetes Pod中指定容器的日志,并将日志流式传输到Airflow的日志系统。以下是该方法的主要工作流程:

  • 首先,初始化一个名为pod_logging_statuses的列表,用于存储每个容器的日志获取状态。
  • 然后,使用get_container_names方法获取Pod中所有容器的名称。
  • 使用_reconcile_requested_log_containers方法,根据请求的容器名称和实际的容器名称,确定需要获取日志的容器。
  • 对于每个需要获取日志的容器,调用fetch_container_logs方法获取容器的日志,并将日志获取的状态添加到pod_logging_statuses列表中。
  • 最后,返回pod_logging_statuses列表。
def fetch_requested_container_logs(
    self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False
) -> list[PodLoggingStatus]:
    pod_logging_statuses = []
    all_containers = self.get_container_names(pod)
    containers_to_log = self._reconcile_requested_log_containers(
        requested=containers,
        actual=all_containers,
        pod_name=pod.metadata.name,
    )
    for c in containers_to_log:
        status = self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs)
        pod_logging_statuses.append(status)
    return pod_logging_statuses

源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py

该方法的主要功能是获取Kubernetes Pod中特定容器的日志,并将日志流式传输到Airflow的日志系统。以下是该方法的主要工作流程:

  • 定义一个名为consume_logs的内部函数,用于尝试获取容器日志并处理日志中的每一行。这里最关键的方法是调用read_pod_logs函数。
  • 然后,使用一个无限循环来调用consume_logs函数。在每次循环中,都会尝试获取并处理容器日志
 def fetch_container_logs(
        self,
        pod: V1Pod,
        container_name: str,
        *,
        follow=False,
        since_time: DateTime | None = None,
        post_termination_timeout: int = 120,
    ) -> PodLoggingStatus:
    

        def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None, Exception | None]:
            exception = None
            last_captured_timestamp = None
            connection_timeout = 60 * 30
            read_timeout = 60 * 5
            try:
                logs = self.read_pod_logs(
                    pod=pod,
                    container_name=container_name,
                    timestamps=True,
                    since_seconds=(
                        math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
                    ),
                    follow=follow,
                    post_termination_timeout=post_termination_timeout,
                    _request_timeout=(connection_timeout, read_timeout),
                )
                ...
                
            except TimeoutError as e:
                # in case of timeout, increment return time by 2 seconds to avoid
                # duplicate log entries
                if val := (last_captured_timestamp or since_time):
                    return val.add(seconds=2), e
            except HTTPError as e:
                exception = e
                self.log.exception(
                    "Reading of logs interrupted for container %r; will retry.",
                    container_name,
                )
            return last_captured_timestamp or since_time, exception


        last_log_time = since_time
        while True:
            last_log_time, exc = consume_logs(since_time=last_log_time)
            if not self.container_is_running(pod, container_name=container_name):
                return PodLoggingStatus(running=False, last_log_time=last_log_time)
            if not follow:
                return PodLoggingStatus(running=True, last_log_time=last_log_time)
            else:
                # a timeout is a normal thing and we ignore it and resume following logs
                if not isinstance(exc, TimeoutError):
                    self.log.warning(
                        "Pod %s log read interrupted but container %s still running",
                        pod.metadata.name,
                        container_name,
                    )
                time.sleep(1)

我们再看看read_pod_logs源码:

源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py

可以看到该方法最终是通过调用Kubernetes的Python SDK的read_namespaced_pod_log函数实现读取日志。

@tenacity.retry(stop=tenacity.stop_after_attempt(6), wait=tenacity.wait_exponential(max=15), reraise=True)
def read_pod_logs(
    self,
    pod: V1Pod,
    container_name: str,
    tail_lines: int | None = None,
    timestamps: bool = False,
    since_seconds: int | None = None,
    follow=True,
    post_termination_timeout: int = 120,
    **kwargs,
) -> PodLogsConsumer:
    """Read log from the POD."""
    additional_kwargs = {}
    if since_seconds:
        additional_kwargs["since_seconds"] = since_seconds

    if tail_lines:
        additional_kwargs["tail_lines"] = tail_lines
    additional_kwargs.update(**kwargs)

    try:
        logs = self._client.read_namespaced_pod_log(
            name=pod.metadata.name,
            namespace=pod.metadata.namespace,
            container=container_name,
            follow=follow,
            timestamps=timestamps,
            _preload_content=False,
            **additional_kwargs,
        )
    except HTTPError:
        self.log.exception("There was an error reading the kubernetes API.")
        raise

    return PodLogsConsumer(
        response=logs,
        pod=pod,
        pod_manager=self,
        container_name=container_name,
        post_termination_timeout=post_termination_timeout,
    )

# 销毁Pod

接下来,我们分析销毁Pod的流程:

源码位置:airflow/airflow/providers/cncf/kubernetes/operators/pod.py

cleanup函数最终也是调用pod_manager对象的delete_pod方法来删除Pod。

def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
    ...
    self.process_pod_deletion(remote_pod, reraise=False)
    
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
    with _optionally_suppress(reraise=reraise):
        if pod is not None:
            should_delete_pod = (
                (self.on_finish_action == OnFinishAction.DELETE_POD)
                or (
                    self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
                    and pod.status.phase == PodPhase.SUCCEEDED
                )
                or (
                    self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
                    and container_is_succeeded(pod, self.base_container_name)
                )
            )
            if should_delete_pod:
                self.log.info("Deleting pod: %s", pod.metadata.name)
                self.pod_manager.delete_pod(pod)
            else:
                self.log.info("Skipping deleting pod: %s", pod.metadata.name)

源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py

如下是delete_pod的源码,最终是通过调用Kubernetes的Python SDK中的delete_namespaced_pod方法实现。

def delete_pod(self, pod: V1Pod) -> None:
    """Delete POD."""
    try:
        self._client.delete_namespaced_pod(
            pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions()
        )
    except ApiException as e:
        # If the pod is already deleted
        if str(e.status) != "404":
            raise

# k8s上工作流任务运行综合实践

# k8s本地环境部署

下面我们部署的环境是在mac上直接通过docker desktop版本进行快速部署。

这里我们借助Github上一个开源的项目进行快速部署。

git clone https://github.com/AliyunContainerService/k8s-for-docker-desktop.git
cd k8s-for-docker-desktop

接着下载docker desktop,该软件支持启用Kubernetes服务。

image-20240227193430955

然后在设置页面启用Kubernetes服务。

image-20240227193550828

当Kubernetes服务运行成功后,我们可以在Mac右上角的Docker图标点击后可以看到绿色状态,显示【Kubernetes is running】。

image-20240227193741011

接下来我们切换Kubernetes运行上下文至 docker-desktop:

# kubectl config use-context docker-desktop

至此,我们就成功部署了Kubernetes服务,我们可以通过kubectl命令验证Kubernetes集群状态:

# kubectl cluster-info

Kubernetes control plane is running at https://127.0.0.1:6443
CoreDNS is running at https://127.0.0.1:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.
# kubectl get nodes

NAME             STATUS   ROLES           AGE   VERSION
docker-desktop   Ready    control-plane   9h    v1.25.9

配置Kubernetes控制台

# kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.5.1/aio/deploy/recommended.yaml
namespace/kubernetes-dashboard created
serviceaccount/kubernetes-dashboard created
service/kubernetes-dashboard created
secret/kubernetes-dashboard-certs created
secret/kubernetes-dashboard-csrf created
secret/kubernetes-dashboard-key-holder created
configmap/kubernetes-dashboard-settings created
role.rbac.authorization.k8s.io/kubernetes-dashboard created
clusterrole.rbac.authorization.k8s.io/kubernetes-dashboard created
rolebinding.rbac.authorization.k8s.io/kubernetes-dashboard created
clusterrolebinding.rbac.authorization.k8s.io/kubernetes-dashboard created
deployment.apps/kubernetes-dashboard created
service/dashboard-metrics-scraper created
deployment.apps/dashboard-metrics-scraper created

检查 kubernetes-dashboard 服务状态

# kubectl get pod -n kubernetes-dashboard

NAME                                         READY   STATUS    RESTARTS   AGE
dashboard-metrics-scraper-748b4f5b9d-npkrb   1/1     Running   0          29s
kubernetes-dashboard-86b687bd84-9k8wv        1/1     Running   0          29s

开启API Server访问代理

如下表示服务启动成功,我们可以直接访问。

# kubectl proxy
Starting to serve on 127.0.0.1:8001

访问控制台

在访问之前,需要授权kube-system默认服务账号

kubectl apply -f kube-system-default.yaml
clusterrolebinding.rbac.authorization.k8s.io/kube-system-default created
secret/default created

获取TOKEN

TOKEN=$(kubectl -n kube-system describe secret default| awk '$1=="token:"{print $2}')
kubectl config set-credentials docker-desktop --token="${TOKEN}"
echo $TOKEN

此时会输出一长串的TOKEN,通过该TOKEN我们就可以进入K8s控制台。

eyJhbGciOiJSUzI1NiIsImtpZCI6IjVyMk0tQmp2bXFhbXVfN3dxanROM0tuZWdldm5iSFo1cy1VaVJWdk5OMDQifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiIwODZjOGM1Yy1iY2ZiLTRhNjQtOWRjYy03NTFjNmMzYzVjNmIiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.UOKWuKm68X9Q2frJ_3XqZyGyLuHPHM6fvrweT-DtO2-uXmmcmYjw5d9Si3gU6tFkug5xTAXTqBWRlIGw8ohXPvAO5m9fkyxN2YIEOCrXOOpjrP3exS6K65E8V33x7XQ_SuCj42rqQKd1rPw2NVRLMZM17SrUe0Y424IsF1IEZZ_-Q7dBOPbC2xvMpoAbs1rNC5vlsWj-vY6_DyuPb7R0DuAXNuWByyjhyHDz4dw0-Y-dd02J7QWBHJEPVFRwGgcvJguikRbQEgmszRgq7Spc1TyVN0MHXR5XRNxeo1-8vnqI030YaDqJ_TQ7EWi7QXucU-iIvbQInOXe1bfwnsOpQg

通过访问如下地址并输入前面生成的TOKEN。

http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#/login
image-20240227195535937

接着我们就可以看到Kubernetes的控制台,后面我们通过Kubernetes的API进行Pod的创建和删除,都可以在控制台上看到。

image-20240227195629928

至此,我们就成功部署了K8s的服务。

# k8s 基础命令

get

kubectl get 是一个用于从Kubernetes集群中获取资源信息的命令。它可以用于查询各种类型的Kubernetes资源,如pod、service、deployment等。

例如,查看集群中所有节点。

kubectl get nodes

例如,获取集群中所有pod。

kubectl get pods

create

kubectl create命令用于根据文件或输入创建集群resource。

例如,根据yaml配置文件创建resource。

code/test.py

import time
import os

os.system('echo "hello world" > /app/output.txt')
print('ok')

文件:python_code.yaml

apiVersion: v1
kind: Pod
metadata:
  name: my-python-app-pod
  labels:
    app: my-python-app
spec:
  containers:
  - name: my-python-app
    image: python:3
    command: ["python"]
    args: ["/app/test.py"]
    volumeMounts:
    - name: host-volume
      mountPath: /app
  volumes:
  - name: host-volume
    hostPath:
      path: /Users/shuwoom/Desktop/k8s-api-demo/code
      type: Directory
  restartPolicy: Never

现在,使用kubectl create -f python_code.yaml命令创建Pod时,一旦Python应用程序运行结束,Pod将自动退出并不再运行。

此时,通过get命令可以看到刚刚创建的资源。

kubectl get pods
NAME        READY   STATUS    RESTARTS   AGE
my-python-app   1/1     Running   0          37s

如果想获取python脚本的输出结果,可以通过如下命令实现:

我们可以通过logs命令查看pod的运行输出:

kubectl logs my-python-app-pod
ok

并且可以看到code目录下成功创建了一个output.txt的文件

ll code
total 16
-rw-r--r--  1 shuwoom  staff    12B  2 27 20:45 output.txt
-rw-r--r--@ 1 shuwoom  staff    85B  2 27 20:45 test.py

run

也可以直接通过指定镜像直接通过kubectl run命令直接运行:

kubectl run my-nginx --image=nginx:latest

我们可以看到一个叫my-nginx的资源成功创建运行了。

kubectl get pods
NAME        READY   STATUS    RESTARTS   AGE
apple-app   1/1     Running   0          4m56s
my-nginx    1/1     Running   0          6s

logs

如果想查看Pod运行输出的日志,可以通过logs命令,例如:

kubectl logs [POD名称]

在后面实现的k8s上通过POD执行脚本任务,我们也可以通过该命令,来获取POD中执行脚本的输出结果。

delete

根据resource名删除resource

例如,删除pod:

kubectl delete po my-nginx

也可以通过之前指定的yaml文件来删除pod:

kubectl delete -f sample/apple.yaml

# k8s python SDK调用

创建Pod有两种方式,一种是通过yaml文件创建,另一种是在代码中构造对象创建。

基于yaml创建Pod

python_code.yml复用上一节的文件。

import yaml
from kubernetes import utils
from kubernetes import client, config

def create_from_yaml():
    # 读取yaml文件
    with open("python_code.yaml") as f:
        yaml_file = yaml.safe_load(f)

    # 读取配置文件
    config.load_kube_config()
    k8s_client = client.ApiClient()

    # 创建pods
    utils.create_from_yaml(k8s_client, yaml_objects=[yaml_file], namespace="default")

基于对象创建Pod

import datetime
import pytz
import yaml
import time
from kubernetes import utils
from kubernetes.client.rest import ApiException
from kubernetes import client, config

DEPLOYMENT_NAME = "pod-deployment"

def create_pod_by_object(name):
    # 加载kubeconfig文件
    config.load_kube_config()

    # 创建Kubernetes API客户端实例
    api_instance = client.CoreV1Api()

    # 定义Pod的元数据
    metadata = client.V1ObjectMeta(
        name=name,
        labels={"app": "my-python-app"}
    )

    # 定义容器的配置
    container = client.V1Container(
        name=name,
        image="python:3",
        command=["python"],
        args=["/app/test.py"],
        volume_mounts=[
            client.V1VolumeMount(name="host-volume", mount_path="/app")
        ],
        resources=client.V1ResourceRequirements(
            requests={"cpu": "100m", "memory": "200Mi"},
            limits={"cpu": "500m", "memory": "500Mi"},
        ),
    )

    # 定义HostPath卷的配置
    # 将宿主机上的code目录挂载的pod上的/app路径
    host_path_volume = client.V1HostPathVolumeSource(
        path="/Users/shuwoom/Desktop/k8s-api-demo/code",
        type="Directory"
    )

    # 定义卷的配置
    volume = client.V1Volume(
        name="host-volume",
        host_path=host_path_volume
    )

    # 定义Pod的配置
    pod_spec = client.V1PodSpec(
        containers=[container],
        volumes=[volume],
        restart_policy="Never"
    )

    # 创建Pod对象
    pod = client.V1Pod(
        api_version="v1",
        kind="Pod",
        metadata=metadata,
        spec=pod_spec
    )

    # 创建Pod
    api_instance.create_namespaced_pod(namespace="default", body=pod)
    print("Pod created")

查看Pod日志

from kubernetes import utils
from kubernetes.client.rest import ApiException
from kubernetes import client, config

def get_pod_log(pod_name,namespace='default'):
    
    '''
    可以通过读取pod的日志获取脚本输出
    '''
    config.load_kube_config()
    try:
        v1 = client.CoreV1Api()
        api_response = v1.read_namespaced_pod_log(name=pod_name, namespace=namespace)
        print(api_response)
    except ApiException as e:
        print('Found exception in reading the logs'+str(e))

查看Pod列表

from kubernetes import client, config
def list_pods(namespace='default'):
    
    v1 = client.CoreV1Api()
    pods = v1.list_namespaced_pod(namespace)
    for pod in pods.items:
        print(f"{pod.metadata.name} {pod.status.phase} {pod.status.pod_ip} {pod.metadata.namespace}")

删除Pod资源

from kubernetes import client, config

def delete_deployment(name, namespace='default'):
    config.load_kube_config()
    v1 = client.CoreV1Api()

    resp = v1.delete_namespaced_pod(
        name=name,
        namespace="default"
    )
    print("\n[INFO] deployment `nginx-deployment` deleted.")

流程任务执行过程

如下图,是基于Kubernetes集群的API接口,通过调用Kubernetes Python SDK实现工作流中脚本任务执行的流程图。

在执行脚本之前,首先在挂载的代码目录下创建待执行的脚本,并将该脚本作为Pod的启动执行的命令,然后基于yaml创建Pod运行。

运行结束后,引擎通过调用Logs接口获取脚本的输出结果进行存储。最后通过调用Kubernetes的Delete销毁Pod结束该脚本任务的执行。

image-20240228094730327
最后更新: 4/6/2024, 8:27:42 AM