云原生工作流是一种基于云原生理念构建的、在云环境中运行的工作流系统。云原生工作流利用云计算的弹性、可伸缩性和容器化等特性,使工作流程更加灵活、可靠和易于管理。下面是关于云原生工作流的一些基本信息:
云原生工作流的特点:
- 容器化: 云原生工作流通常采用容器技术,将工作流中的各个组件打包成独立的容器,以便在不同环境中运行。
- 弹性伸缩: 云原生工作流能够根据负载的变化自动伸缩,以保持系统的高可用性和性能。
- 微服务架构: 采用微服务的架构,将工作流拆分成小的、独立的服务,每个服务负责一个特定的功能。
- 自动化: 云原生工作流强调自动化,通过自动执行、监控和调整工作流程,提高效率和减少人工干预。
云原生工作流的实现:
- Kubernetes: Kubernetes是一个开源的容器编排平台,被广泛用于云原生应用的部署和管理。云原生工作流可以直接利用Kubernetes的特性,例如Pod、Service、ReplicaSet等,来实现容器化和弹性伸缩。
- Argo Workflow: Argo Workflow是一个基于Kubernetes的开源工作流引擎,专门用于管理和执行容器化工作流。它支持定义复杂的工作流,包括并行和串行的任务,适用于各种场景。
- Tekton: Tekton是一个云原生的持续交付(CI/CD)框架,也可以用于构建云原生工作流。它提供了定义、运行和监控容器化工作流的能力,可以与Kubernetes和其他CI/CD工具集成。
- Apache Airflow: Apache Airflow是一个流行的开源工作流调度系统,支持定义、调度和监控工作流。它可以在云环境中运行,并且具有扩展性和灵活性。
- Serverless 架构: 通过采用无服务器(Serverless)架构,可以进一步简化工作流的管理和扩展。服务如AWS Step Functions、Azure Logic Apps和Google Cloud Composer等提供了云原生的无服务器工作流解决方案。
实现云原生工作流的关键在于选择合适的工作流引擎,结合云原生基础设施和服务,以满足应用程序的需求,并充分发挥云计算的优势。
# k8s上部署工作流原理
前面,我们看到的云原生的诸多优点。下面我们来了解下Kubernetes这个最出名的容器编排平台,以及如何基于Kubernetes实现容器化部署。
Kubernetes(简称 K8s)是一个开源的容器编排平台,用于自动化容器应用程序的部署、扩展和管理。它最初是由 Google 设计和开发的,现在由 Cloud Native Computing Foundation(CNCF)维护。Kubernetes 提供了一种跨集群的容器管理解决方案,让您能够在公有云、私有云或混合云环境中运行、管理和扩展容器化应用程序。
# Kubernetes介绍
Kubernetes 的主要特性和优势包括:
- 自动化部署和滚动更新:Kubernetes 可以根据预定义的配置自动部署应用程序,并在更新时进行滚动更新,以确保零停机时间和持续可用性。
- 自动扩展:Kubernetes 可以根据资源使用情况(如 CPU 和内存)自动扩展或收缩应用程序实例数量,以满足不断变化的负载需求。
- 负载均衡和服务发现:Kubernetes 提供了内置的负载均衡器和服务发现机制,允许容器之间以及外部客户端访问容器应用程序。
- 自我修复:Kubernetes 可以检测到容器故障并自动重新启动容器,确保应用程序始终保持运行状态。
- 存储编排:Kubernetes 允许自动挂载所需的存储系统(例如本地存储、公有云提供商的存储服务等),以满足应用程序的存储需求。
- 安全和访问控制:Kubernetes 提供了多种安全特性,如网络策略、TLS、身份验证和授权,以确保应用程序和集群的安全。
- 多云和混合云支持:Kubernetes 可以在各种环境中运行,包括公有云、私有云、混合云和本地数据中心,实现真正的混合云部署。
- 声明式配置:Kubernetes 使用声明式配置,允许描述应用程序的期望状态,而 Kubernetes 则负责实现和维护这个状态。
Kubernetes 提供了一种强大、灵活且可扩展的容器管理平台,使企业能够更好地利用容器技术,提高应用程序的可靠性、可用性和安全性,降低运维成本。
# k8s容器化部署实践
假设我们要将一个Golang实现的API服务进行容器化部署。
假设项目目录结构如下:
main.go
main.go
package main
import (
"net/http"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"message": "pong",
})
})
r.Run() // listen and serve on 0.0.0.0:8080
}
创建Docker镜像
创建 Docker 镜像 首先,需要为 Golang API 服务创建一个 Docker 镜像。创建一个名为 Dockerfile
的文件,将其放在您的项目根目录中,并添加以下内容:
# 使用官方 Golang 镜像作为基础镜像
FROM golang:1.20
# 将工作目录设置为 /app
WORKDIR /app
# 设置 Go 环境变量 GO111MODULE 为 on,启用 Go Modules 功能。
RUN go env -w GO111MODULE=on
# 设置 Go 环境变量 GOPROXY,使用中国镜像站点 goproxy.cn 作为 Go Modules 的代理。
RUN go env -w GOPROXY=https://goproxy.cn,direct
# 设置镜像的维护者信息。
MAINTAINER "test"
# 复制项目源代码到工作目录
COPY . .
# 初始化一个新的 Go Modules 项目
RUN go mod init api-server
# 下载并安装项目所需的依赖项
RUN go mod tidy
# 安装并升级 Golang Gin Web 框架。
RUN go get -u github.com/gin-gonic/gin
# 构建 Golang API 服务
RUN go build main.go
# 暴露端口(请根据您的实际端口进行修改)
EXPOSE 8080
# 运行 Golang API 服务
CMD ["./main"]
构建Docker镜像
构建 Docker 镜像 在项目根目录中运行以下命令,将您的 Golang API 服务打包成一个 Docker 镜像:
docker build -t your-dockerhub-username/golang-api-service:v1.0 .
推送到镜像仓库
docker login
docker push your-dockerhub-username/golang-api-service:v1.0
创建yaml部署文件
创建 Kubernetes 部署和服务 接下来,您需要创建一个 Kubernetes 部署和服务来运行和暴露您的 Golang API 服务。创建一个名为 deployment.yaml
的文件,并添加以下内容:
apiVersion: apps/v1
kind: Deployment
metadata:
name: golang-api-service
spec:
replicas: 3
selector:
matchLabels:
app: golang-api-service
template:
metadata:
labels:
app: golang-api-service
spec:
containers:
- name: golang-api-service
image: your-dockerhub-username/golang-api-service:v1.0
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: golang-api-service
spec:
selector:
app: golang-api-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: LoadBalancer
部署到Kubernetes集群
部署到 Kubernetes 集群 确保您已连接到正确的 Kubernetes 集群,并运行以下命令:
kubectl apply -f deployment.yaml
查看部署状态
使用以下命令查看部署状态:
kubectl get pods
NAME READY STATUS RESTARTS AGE
golang-api-service-6df899b897-kvkcg 1/1 Running 0 5s
golang-api-service-6df899b897-l9qnl 1/1 Running 0 5s
golang-api-service-6df899b897-nk9r2 1/1 Running 0 5s
kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
golang-api-service LoadBalancer 10.107.173.31 localhost 80:32636/TCP 20s
当 Golang API 服务成功部署到 Kubernetes 集群后,您可以通过服务的 IP 地址和端口访问它。如果您使用的是 LoadBalancer 服务类型,您还可以通过负载均衡器提供的公共 IP 地址访问您的 API 服务。
curl http://localhost/ping
{"message":"pong"}
# k8s上运行工作流任务原理
# 概览
如下图,当 DAG 提交任务时,KubernetesExecutor 从 Kubernetes API 请求工作单元 pod。然后,worker pod 运行任务、报告结果并终止。下面,我们尝试分析Airflow中Kubernetes创建和销毁Pod的流程原理,并在下一环节中手动实现这一过程。
下面我们通过分析Airflow的KubernetesPodOperator了解其实现原理,如下是KubernetesPodOperator的使用方法。
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
k = KubernetesPodOperator(
name="hello-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
)
k.dry_run()
# 执行入口
KubernetesPodOperator的入口函数是execute方法:
源码位置:airflow/airflow/providers/cncf/kubernetes/operators/pod.py
该方法会调用execute_sync同步执行或execute_async异步执行来创建Pod,这里我们分析execute_sync同步执行的代码
def execute(self, context: Context):
"""Based on the deferrable parameter runs the pod asynchronously or synchronously."""
if self.deferrable:
self.execute_async(context)
else:
return self.execute_sync(context)
如下是execute_sync源码:
def execute_sync(self, context: Context):
result = None
try:
# 函数会检查是否已经有一个Pod请求对象,如果没有,就会创建一个。
if self.pod_request_obj is None:
self.pod_request_obj = self.build_pod_request_obj(context)
# 函数会检查是否已经有一个Pod对象,如果没有,就会获取或创建一个。
if self.pod is None:
self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill`
pod_request_obj=self.pod_request_obj,
context=context,
)
# 函数会将Pod的名称和命名空间推送到xcom,以便在出现错误时仍然可以获取这些值
ti = context["ti"]
ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)
# 函数会获取远程Pod,以便在清理方法中使用
self.remote_pod = self.find_pod(self.pod.metadata.namespace, context=context)
# 如果定义了回调函数,函数会在Pod创建和启动时调用相应的回调函数
if self.callbacks:
self.callbacks.on_pod_creation(
pod=self.remote_pod, client=self.client, mode=ExecutionMode.SYNC
)
self.await_pod_start(pod=self.pod)
if self.callbacks:
self.callbacks.on_pod_starting(
pod=self.find_pod(self.pod.metadata.namespace, context=context),
client=self.client,
mode=ExecutionMode.SYNC,
)
# 如果需要获取日志,函数会获取Pod的日志。
if self.get_logs:
self.pod_manager.fetch_requested_container_logs(
pod=self.pod,
containers=self.container_logs,
follow_logs=True,
)
# 如果不需要获取日志,或者基础容器的名称不在容器日志中,函数会等待容器完成
if not self.get_logs or (
self.container_logs is not True and self.base_container_name not in self.container_logs
):
self.pod_manager.await_container_completion(
pod=self.pod, container_name=self.base_container_name
)
# 如果定义了回调函数,函数会在Pod完成时调用相应的回调函数
if self.callbacks:
self.callbacks.on_pod_completion(
pod=self.find_pod(self.pod.metadata.namespace, context=context),
client=self.client,
mode=ExecutionMode.SYNC,
)
if self.do_xcom_push:
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
result = self.extract_xcom(pod=self.pod)
istio_enabled = self.is_istio_enabled(self.pod)
self.remote_pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
finally:
# 最后,函数会清理Pod,并在清理完成后调用相应的回调函数
pod_to_clean = self.pod or self.pod_request_obj
self.cleanup(
pod=pod_to_clean,
remote_pod=self.remote_pod,
)
if self.callbacks:
self.callbacks.on_pod_cleanup(pod=pod_to_clean, client=self.client, mode=ExecutionMode.SYNC)
if self.do_xcom_push:
return result
这段代码里,有几个关键的被调用函数:
- get_or_create_pod:创建Pod
- fetch_requested_container_logs:获取Pod执行日志
- cleanup:销毁Pod
# 创建Pod
我们先分析创建Pod的函数调用过程:
源码位置:airflow/airflow/providers/cncf/kubernetes/operators/pod.py
def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s.V1Pod:
if self.reattach_on_restart:
pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
if pod:
return pod
self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
self.pod_manager.create_pod(pod=pod_request_obj)
return pod_request_obj
该方法通过调用pod_manager对象的create_pod方法创建Pod。
源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py
在PodManager类中,我们可以看到各种调用Kubernetes接口的python SDK的代码。
run_pod_async方法通过调用Kubernetes的python SDK中的create_namespaced_pod函数实现创建Pod。
def create_pod(self, pod: V1Pod) -> V1Pod:
"""Launch the pod asynchronously."""
return self.run_pod_async(pod)
def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
"""Run POD asynchronously."""
sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
json_pod = json.dumps(sanitized_pod, indent=2)
self.log.debug("Pod Creation Request: \n%s", json_pod)
try:
resp = self._client.create_namespaced_pod(
body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
)
self.log.debug("Pod Creation Response: %s", resp)
except Exception as e:
self.log.exception(
"Exception when attempting to create Namespaced Pod: %s", str(json_pod).replace("\n", " ")
)
raise e
return resp
# 获取执行日志
execute_sync调用pod_manager的fetch_requested_container_logs方法。
源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py
该方法的主要功能是获取Kubernetes Pod中指定容器的日志,并将日志流式传输到Airflow的日志系统。以下是该方法的主要工作流程:
- 首先,初始化一个名为
pod_logging_statuses
的列表,用于存储每个容器的日志获取状态。 - 然后,使用
get_container_names
方法获取Pod中所有容器的名称。 - 使用
_reconcile_requested_log_containers
方法,根据请求的容器名称和实际的容器名称,确定需要获取日志的容器。 - 对于每个需要获取日志的容器,调用
fetch_container_logs
方法获取容器的日志,并将日志获取的状态添加到pod_logging_statuses
列表中。 - 最后,返回
pod_logging_statuses
列表。
def fetch_requested_container_logs(
self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False
) -> list[PodLoggingStatus]:
pod_logging_statuses = []
all_containers = self.get_container_names(pod)
containers_to_log = self._reconcile_requested_log_containers(
requested=containers,
actual=all_containers,
pod_name=pod.metadata.name,
)
for c in containers_to_log:
status = self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs)
pod_logging_statuses.append(status)
return pod_logging_statuses
源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py
该方法的主要功能是获取Kubernetes Pod中特定容器的日志,并将日志流式传输到Airflow的日志系统。以下是该方法的主要工作流程:
- 定义一个名为
consume_logs
的内部函数,用于尝试获取容器日志并处理日志中的每一行。这里最关键的方法是调用read_pod_logs函数。 - 然后,使用一个无限循环来调用
consume_logs
函数。在每次循环中,都会尝试获取并处理容器日志
def fetch_container_logs(
self,
pod: V1Pod,
container_name: str,
*,
follow=False,
since_time: DateTime | None = None,
post_termination_timeout: int = 120,
) -> PodLoggingStatus:
def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None, Exception | None]:
exception = None
last_captured_timestamp = None
connection_timeout = 60 * 30
read_timeout = 60 * 5
try:
logs = self.read_pod_logs(
pod=pod,
container_name=container_name,
timestamps=True,
since_seconds=(
math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
),
follow=follow,
post_termination_timeout=post_termination_timeout,
_request_timeout=(connection_timeout, read_timeout),
)
...
except TimeoutError as e:
# in case of timeout, increment return time by 2 seconds to avoid
# duplicate log entries
if val := (last_captured_timestamp or since_time):
return val.add(seconds=2), e
except HTTPError as e:
exception = e
self.log.exception(
"Reading of logs interrupted for container %r; will retry.",
container_name,
)
return last_captured_timestamp or since_time, exception
last_log_time = since_time
while True:
last_log_time, exc = consume_logs(since_time=last_log_time)
if not self.container_is_running(pod, container_name=container_name):
return PodLoggingStatus(running=False, last_log_time=last_log_time)
if not follow:
return PodLoggingStatus(running=True, last_log_time=last_log_time)
else:
# a timeout is a normal thing and we ignore it and resume following logs
if not isinstance(exc, TimeoutError):
self.log.warning(
"Pod %s log read interrupted but container %s still running",
pod.metadata.name,
container_name,
)
time.sleep(1)
我们再看看read_pod_logs源码:
源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py
可以看到该方法最终是通过调用Kubernetes的Python SDK的read_namespaced_pod_log函数实现读取日志。
@tenacity.retry(stop=tenacity.stop_after_attempt(6), wait=tenacity.wait_exponential(max=15), reraise=True)
def read_pod_logs(
self,
pod: V1Pod,
container_name: str,
tail_lines: int | None = None,
timestamps: bool = False,
since_seconds: int | None = None,
follow=True,
post_termination_timeout: int = 120,
**kwargs,
) -> PodLogsConsumer:
"""Read log from the POD."""
additional_kwargs = {}
if since_seconds:
additional_kwargs["since_seconds"] = since_seconds
if tail_lines:
additional_kwargs["tail_lines"] = tail_lines
additional_kwargs.update(**kwargs)
try:
logs = self._client.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container=container_name,
follow=follow,
timestamps=timestamps,
_preload_content=False,
**additional_kwargs,
)
except HTTPError:
self.log.exception("There was an error reading the kubernetes API.")
raise
return PodLogsConsumer(
response=logs,
pod=pod,
pod_manager=self,
container_name=container_name,
post_termination_timeout=post_termination_timeout,
)
# 销毁Pod
接下来,我们分析销毁Pod的流程:
源码位置:airflow/airflow/providers/cncf/kubernetes/operators/pod.py
cleanup函数最终也是调用pod_manager对象的delete_pod方法来删除Pod。
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
...
self.process_pod_deletion(remote_pod, reraise=False)
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
with _optionally_suppress(reraise=reraise):
if pod is not None:
should_delete_pod = (
(self.on_finish_action == OnFinishAction.DELETE_POD)
or (
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
and pod.status.phase == PodPhase.SUCCEEDED
)
or (
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
and container_is_succeeded(pod, self.base_container_name)
)
)
if should_delete_pod:
self.log.info("Deleting pod: %s", pod.metadata.name)
self.pod_manager.delete_pod(pod)
else:
self.log.info("Skipping deleting pod: %s", pod.metadata.name)
源码位置:airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py
如下是delete_pod的源码,最终是通过调用Kubernetes的Python SDK中的delete_namespaced_pod方法实现。
def delete_pod(self, pod: V1Pod) -> None:
"""Delete POD."""
try:
self._client.delete_namespaced_pod(
pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions()
)
except ApiException as e:
# If the pod is already deleted
if str(e.status) != "404":
raise
# k8s上工作流任务运行综合实践
# k8s本地环境部署
下面我们部署的环境是在mac上直接通过docker desktop版本进行快速部署。
这里我们借助Github上一个开源的项目进行快速部署。
git clone https://github.com/AliyunContainerService/k8s-for-docker-desktop.git
cd k8s-for-docker-desktop
接着下载docker desktop,该软件支持启用Kubernetes服务。
然后在设置页面启用Kubernetes服务。
当Kubernetes服务运行成功后,我们可以在Mac右上角的Docker图标点击后可以看到绿色状态,显示【Kubernetes is running】。
接下来我们切换Kubernetes运行上下文至 docker-desktop:
# kubectl config use-context docker-desktop
至此,我们就成功部署了Kubernetes服务,我们可以通过kubectl命令验证Kubernetes集群状态:
# kubectl cluster-info
Kubernetes control plane is running at https://127.0.0.1:6443
CoreDNS is running at https://127.0.0.1:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.
# kubectl get nodes
NAME STATUS ROLES AGE VERSION
docker-desktop Ready control-plane 9h v1.25.9
配置Kubernetes控制台
# kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.5.1/aio/deploy/recommended.yaml
namespace/kubernetes-dashboard created
serviceaccount/kubernetes-dashboard created
service/kubernetes-dashboard created
secret/kubernetes-dashboard-certs created
secret/kubernetes-dashboard-csrf created
secret/kubernetes-dashboard-key-holder created
configmap/kubernetes-dashboard-settings created
role.rbac.authorization.k8s.io/kubernetes-dashboard created
clusterrole.rbac.authorization.k8s.io/kubernetes-dashboard created
rolebinding.rbac.authorization.k8s.io/kubernetes-dashboard created
clusterrolebinding.rbac.authorization.k8s.io/kubernetes-dashboard created
deployment.apps/kubernetes-dashboard created
service/dashboard-metrics-scraper created
deployment.apps/dashboard-metrics-scraper created
检查 kubernetes-dashboard 服务状态
# kubectl get pod -n kubernetes-dashboard
NAME READY STATUS RESTARTS AGE
dashboard-metrics-scraper-748b4f5b9d-npkrb 1/1 Running 0 29s
kubernetes-dashboard-86b687bd84-9k8wv 1/1 Running 0 29s
开启API Server访问代理
如下表示服务启动成功,我们可以直接访问。
# kubectl proxy
Starting to serve on 127.0.0.1:8001
访问控制台
在访问之前,需要授权kube-system
默认服务账号
kubectl apply -f kube-system-default.yaml
clusterrolebinding.rbac.authorization.k8s.io/kube-system-default created
secret/default created
获取TOKEN
TOKEN=$(kubectl -n kube-system describe secret default| awk '$1=="token:"{print $2}')
kubectl config set-credentials docker-desktop --token="${TOKEN}"
echo $TOKEN
此时会输出一长串的TOKEN,通过该TOKEN我们就可以进入K8s控制台。
eyJhbGciOiJSUzI1NiIsImtpZCI6IjVyMk0tQmp2bXFhbXVfN3dxanROM0tuZWdldm5iSFo1cy1VaVJWdk5OMDQifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiIwODZjOGM1Yy1iY2ZiLTRhNjQtOWRjYy03NTFjNmMzYzVjNmIiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.UOKWuKm68X9Q2frJ_3XqZyGyLuHPHM6fvrweT-DtO2-uXmmcmYjw5d9Si3gU6tFkug5xTAXTqBWRlIGw8ohXPvAO5m9fkyxN2YIEOCrXOOpjrP3exS6K65E8V33x7XQ_SuCj42rqQKd1rPw2NVRLMZM17SrUe0Y424IsF1IEZZ_-Q7dBOPbC2xvMpoAbs1rNC5vlsWj-vY6_DyuPb7R0DuAXNuWByyjhyHDz4dw0-Y-dd02J7QWBHJEPVFRwGgcvJguikRbQEgmszRgq7Spc1TyVN0MHXR5XRNxeo1-8vnqI030YaDqJ_TQ7EWi7QXucU-iIvbQInOXe1bfwnsOpQg
通过访问如下地址并输入前面生成的TOKEN。
http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#/login
接着我们就可以看到Kubernetes的控制台,后面我们通过Kubernetes的API进行Pod的创建和删除,都可以在控制台上看到。
至此,我们就成功部署了K8s的服务。
# k8s 基础命令
get
kubectl get
是一个用于从Kubernetes集群中获取资源信息的命令。它可以用于查询各种类型的Kubernetes资源,如pod、service、deployment等。
例如,查看集群中所有节点。
kubectl get nodes
例如,获取集群中所有pod。
kubectl get pods
create
kubectl create
命令用于根据文件或输入创建集群resource。
例如,根据yaml配置文件创建resource。
code/test.py
import time
import os
os.system('echo "hello world" > /app/output.txt')
print('ok')
文件:python_code.yaml
apiVersion: v1
kind: Pod
metadata:
name: my-python-app-pod
labels:
app: my-python-app
spec:
containers:
- name: my-python-app
image: python:3
command: ["python"]
args: ["/app/test.py"]
volumeMounts:
- name: host-volume
mountPath: /app
volumes:
- name: host-volume
hostPath:
path: /Users/shuwoom/Desktop/k8s-api-demo/code
type: Directory
restartPolicy: Never
现在,使用kubectl create -f python_code.yaml
命令创建Pod时,一旦Python应用程序运行结束,Pod将自动退出并不再运行。
此时,通过get命令可以看到刚刚创建的资源。
kubectl get pods
NAME READY STATUS RESTARTS AGE
my-python-app 1/1 Running 0 37s
如果想获取python脚本的输出结果,可以通过如下命令实现:
我们可以通过logs命令查看pod的运行输出:
kubectl logs my-python-app-pod
ok
并且可以看到code目录下成功创建了一个output.txt的文件
ll code
total 16
-rw-r--r-- 1 shuwoom staff 12B 2 27 20:45 output.txt
-rw-r--r--@ 1 shuwoom staff 85B 2 27 20:45 test.py
run
也可以直接通过指定镜像直接通过kubectl run
命令直接运行:
kubectl run my-nginx --image=nginx:latest
我们可以看到一个叫my-nginx
的资源成功创建运行了。
kubectl get pods
NAME READY STATUS RESTARTS AGE
apple-app 1/1 Running 0 4m56s
my-nginx 1/1 Running 0 6s
logs
如果想查看Pod运行输出的日志,可以通过logs命令,例如:
kubectl logs [POD名称]
在后面实现的k8s上通过POD执行脚本任务,我们也可以通过该命令,来获取POD中执行脚本的输出结果。
delete
根据resource名删除resource
例如,删除pod:
kubectl delete po my-nginx
也可以通过之前指定的yaml文件来删除pod:
kubectl delete -f sample/apple.yaml
# k8s python SDK调用
创建Pod有两种方式,一种是通过yaml文件创建,另一种是在代码中构造对象创建。
基于yaml创建Pod
python_code.yml复用上一节的文件。
import yaml
from kubernetes import utils
from kubernetes import client, config
def create_from_yaml():
# 读取yaml文件
with open("python_code.yaml") as f:
yaml_file = yaml.safe_load(f)
# 读取配置文件
config.load_kube_config()
k8s_client = client.ApiClient()
# 创建pods
utils.create_from_yaml(k8s_client, yaml_objects=[yaml_file], namespace="default")
基于对象创建Pod
import datetime
import pytz
import yaml
import time
from kubernetes import utils
from kubernetes.client.rest import ApiException
from kubernetes import client, config
DEPLOYMENT_NAME = "pod-deployment"
def create_pod_by_object(name):
# 加载kubeconfig文件
config.load_kube_config()
# 创建Kubernetes API客户端实例
api_instance = client.CoreV1Api()
# 定义Pod的元数据
metadata = client.V1ObjectMeta(
name=name,
labels={"app": "my-python-app"}
)
# 定义容器的配置
container = client.V1Container(
name=name,
image="python:3",
command=["python"],
args=["/app/test.py"],
volume_mounts=[
client.V1VolumeMount(name="host-volume", mount_path="/app")
],
resources=client.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "200Mi"},
limits={"cpu": "500m", "memory": "500Mi"},
),
)
# 定义HostPath卷的配置
# 将宿主机上的code目录挂载的pod上的/app路径
host_path_volume = client.V1HostPathVolumeSource(
path="/Users/shuwoom/Desktop/k8s-api-demo/code",
type="Directory"
)
# 定义卷的配置
volume = client.V1Volume(
name="host-volume",
host_path=host_path_volume
)
# 定义Pod的配置
pod_spec = client.V1PodSpec(
containers=[container],
volumes=[volume],
restart_policy="Never"
)
# 创建Pod对象
pod = client.V1Pod(
api_version="v1",
kind="Pod",
metadata=metadata,
spec=pod_spec
)
# 创建Pod
api_instance.create_namespaced_pod(namespace="default", body=pod)
print("Pod created")
查看Pod日志
from kubernetes import utils
from kubernetes.client.rest import ApiException
from kubernetes import client, config
def get_pod_log(pod_name,namespace='default'):
'''
可以通过读取pod的日志获取脚本输出
'''
config.load_kube_config()
try:
v1 = client.CoreV1Api()
api_response = v1.read_namespaced_pod_log(name=pod_name, namespace=namespace)
print(api_response)
except ApiException as e:
print('Found exception in reading the logs'+str(e))
查看Pod列表
from kubernetes import client, config
def list_pods(namespace='default'):
v1 = client.CoreV1Api()
pods = v1.list_namespaced_pod(namespace)
for pod in pods.items:
print(f"{pod.metadata.name} {pod.status.phase} {pod.status.pod_ip} {pod.metadata.namespace}")
删除Pod资源
from kubernetes import client, config
def delete_deployment(name, namespace='default'):
config.load_kube_config()
v1 = client.CoreV1Api()
resp = v1.delete_namespaced_pod(
name=name,
namespace="default"
)
print("\n[INFO] deployment `nginx-deployment` deleted.")
流程任务执行过程
如下图,是基于Kubernetes集群的API接口,通过调用Kubernetes Python SDK实现工作流中脚本任务执行的流程图。
在执行脚本之前,首先在挂载的代码目录下创建待执行的脚本,并将该脚本作为Pod的启动执行的命令,然后基于yaml创建Pod运行。
运行结束后,引擎通过调用Logs接口获取脚本的输出结果进行存储。最后通过调用Kubernetes的Delete销毁Pod结束该脚本任务的执行。