目录
Please enable Javascript to view the contents

K8s存储-CSI插件开发指南

 ·  ☕ 4 分钟

系列导航

本系列从 K8s 存储模型入手,逐步展开到 CSI 插件实现与云原生存储全景。

① 概念 → ② Volume 生命周期 → ③ CSI 架构 → ④ FlexVolume 演进 → ⑤ 对比 → ⑥ 排障 ‖ ⑦ 开发 → ⑧ 高级特性 → ⑨ 演进展望

顺序文章定位
概念与入门基础——K8s 存储模型、PV/PVC/StorageClass
Volume 生命周期全解析全貌——两阶段处理、动态/静态供应
CSI 架构详解标准——gRPC 三服务、Sidecar 模式
FlexVolume 与 CSI 演进演进——FlexVolume 原理、CSI 设计思想
存储方案对比与选型选型——主流 CSI 插件横向对比
排障思路与常用命令运维——工具链 + 场景排查
核心路径 ↑扩展展望 ↓
本篇 - CSI 插件开发指南扩展——从零开发一个 CSI 插件
高级特性详解进阶——快照、克隆、扩容、拓扑感知
云原生存储演进与展望展望——容器原生存储、DPU 卸载

重要

CSI 插件本质是一个 gRPC Server,实现 Identity、Controller、Node 三组服务。开发要点:用 csi-lib-utils 库搭建骨架,Sidecar 负责对接 K8s,插件只关注存储操作本身。


1. 代码结构

一个 CSI 插件的标准目录结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
my-csi-driver/
├── cmd/
│   └── main.go           # 入口,启动 gRPC Server
├── pkg/
│   ├── driver/
│   │   ├── driver.go     # Driver 结构体,注册三组服务
│   │   ├── identity.go   # Identity Service 实现
│   │   ├── controller.go # Controller Service 实现
│   │   └── node.go       # Node Service 实现
│   └── mounter/
│       └── mounter.go    # 封装 mount/umount 系统调用
├── deploy/
│   ├── controller.yaml   # Deployment(Controller + Sidecar)
│   └── node.yaml         # DaemonSet(Node + registrar)
├── Dockerfile
└── Makefile

2. 启动 gRPC Server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// cmd/main.go
package main

import (
    "net"
    "os"
    "google.golang.org/grpc"
    "github.com/container-storage-interface/spec/lib/go/csi"
    "my-csi-driver/pkg/driver"
)

func main() {
    endpoint := os.Getenv("CSI_ENDPOINT") // unix:///csi/csi.sock
    d := driver.NewDriver(endpoint)

    listener, _ := net.Listen("unix", endpoint)
    server := grpc.NewServer()
    csi.RegisterIdentityServer(server, d)
    csi.RegisterControllerServer(server, d)
    csi.RegisterNodeServer(server, d)

    server.Serve(listener)
}

3. Identity Service

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// pkg/driver/identity.go
func (d *Driver) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
    return &csi.GetPluginInfoResponse{
        Name:          "com.example.csi.mydriver",  // 全局唯一名称
        VendorVersion: "v1.0.0",
    }, nil
}

func (d *Driver) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
    return &csi.GetPluginCapabilitiesResponse{
        Capabilities: []*csi.PluginCapability{
            {
                Type: &csi.PluginCapability_Service_{
                    Service: &csi.PluginCapability_Service{
                        Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
                    },
                },
            },
        },
    }, nil
}

func (d *Driver) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
    return &csi.ProbeResponse{}, nil
}
GetPluginInfo 返回的 Name 必须与 StorageClass 的 provisioner 字段一致。这是 K8s 找到对应 CSI 插件的唯一标识。

4. Controller Service

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// pkg/driver/controller.go

// CreateVolume — 创建底层存储资源
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
    // 1. 校验参数
    if req.GetName() == "" {
        return nil, status.Error(codes.InvalidArgument, "name missing")
    }
    cap := req.GetCapacityRange()
    size := cap.GetRequiredBytes()

    // 2. 调用存储后端 API 创建存储
    volumeID, err := d.backend.CreateVolume(req.GetName(), size)
    if err != nil {
        return nil, status.Errorf(codes.Internal, "create volume failed: %v", err)
    }

    // 3. 返回 Volume 信息
    return &csi.CreateVolumeResponse{
        Volume: &csi.Volume{
            VolumeId:      volumeID,
            CapacityBytes: size,
            VolumeContext: map[string]string{
                "backend": "my-storage",
            },
        },
    }, nil
}

// DeleteVolume — 删除底层存储资源
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
    if req.GetVolumeId() == "" {
        return nil, status.Error(codes.InvalidArgument, "volume ID missing")
    }
    err := d.backend.DeleteVolume(req.GetVolumeId())
    if err != nil {
        return nil, status.Errorf(codes.Internal, "delete volume failed: %v", err)
    }
    return &csi.DeleteVolumeResponse{}, nil
}

// ControllerPublishVolume — Attach:将存储挂接到节点
func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
    // 1. 校验参数
    // 2. 调用存储后端 API 将 Volume 映射到目标节点
    err := d.backend.Attach(req.GetVolumeId(), req.GetNodeId())
    if err != nil {
        return nil, status.Errorf(codes.Internal, "attach failed: %v", err)
    }
    return &csi.ControllerPublishVolumeResponse{}, nil
}

// ControllerUnpublishVolume — Detach:从节点卸接存储
func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
    err := d.backend.Detach(req.GetVolumeId(), req.GetNodeId())
    if err != nil {
        return nil, status.Errorf(codes.Internal, "detach failed: %v", err)
    }
    return &csi.ControllerUnpublishVolumeResponse{}, nil
}

5. Node Service

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// pkg/driver/node.go

// NodeStageVolume — 格式化 + 挂载到全局目录
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
    // 1. 获取设备路径(由 Attach 阶段映射到节点的 /dev/xxx)
    devicePath := req.GetPublishContext()["devicePath"]

    // 2. 格式化(首次使用)
    stageDir := req.GetStagingTargetPath()
    if !isFormatted(devicePath) {
        exec.Command("mkfs.ext4", "-F", devicePath).Run()
    }

    // 3. 挂载到全局目录
    exec.Command("mount", devicePath, stageDir).Run()

    return &csi.NodeStageVolumeResponse{}, nil
}

// NodePublishVolume — bind mount 到 Pod 容器目录
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
    stageDir := req.GetStagingTargetPath()
    targetDir := req.GetTargetPath()

    // bind mount: stageDir → targetDir
    exec.Command("mount", "--bind", stageDir, targetDir).Run()

    return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume — 从 Pod 容器目录卸载
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
    exec.Command("umount", req.GetTargetPath()).Run()
    return &csi.NodeUnpublishVolumeResponse{}, nil
}

// NodeUnstageVolume — 从全局目录卸载
func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
    exec.Command("umount", req.GetStagingTargetPath()).Run()
    return &csi.NodeUnstageVolumeResponse{}, nil
}

6. 关键操作明细

步骤CSI 接口等效系统命令说明
创建存储CreateVolume调存储后端 API返回 Volume ID
AttachControllerPublishVolumeiSCSI login / NVMe connect将远程设备映射到节点
Stage(格式化+挂载)NodeStageVolumemkfs + mount挂载到 Pod 的全局目录
Publish(bind mount)NodePublishVolumemount –bind挂载到容器目录
UnpublishNodeUnpublishVolumeumount从容器目录卸载
UnstageNodeUnstageVolumeumount从全局目录卸载
DetachControllerUnpublishVolumeiSCSI logout / NVMe disconnect从节点卸接设备
删除存储DeleteVolume调存储后端 API删除底层存储

7. 注意事项

#问题说明
1幂等性CreateVolume 被重复调用应返回同一 Volume ID
2清理完整性DeleteVolume 必须删除所有底层资源
3Node Service 权限DaemonSet 需要 privileged: true 执行 mount/umount
4mount propagationNode Pod 需要设置 mountPropagation: Bidirectional
5gRPC 错误码status.Errorf(codes.XXX, ...) 返回标准错误码
6版本兼容注意 CSI Spec 版本与 K8s 版本的对应关系
7拓扑感知如果存储与节点位置相关,需实现 GetPluginCapabilities 中的 Topology

参考链接

分享

Hex
作者
Hex
CloudNative Developer