重要
环境说明
安装
使用
0. 校验kubeconfig可用性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import (
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
## 解析kubeConfig文件, 校验
func (k *K8S) ParseConf(kubeConfig []byte) (Conf clientcmdapi.Config, err error) {
var (
restConf clientcmd.ClientConfig
)
if restConf, err = clientcmd.NewClientConfigFromBytes(kubeConfig); err != nil {
log.Error(fmt.Sprintf("Could not load kube Config : %s", err.Error()))
return
}
Conf, _ = restConf.RawConfig()
return
}
|
1. 初始化客户端clientSet
1.1 集群内初始化ClientSet
官方Example
1.2 集群外初始化ClientSet
官方Example
1.3 从字节流创建ClientSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| package k8s
import (
"k8s.io/client-go/rest"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
type K8S struct {
clientSet *kubernetes.Clientset
}
// Connect - 初始化k8s客户端
func (k *K8S) Connect(kubeConfig []byte) (err error) {
var (
restConf *rest.Config
)
if restConf, err = clientcmd.RESTConfigFromKubeConfig(kubeConfig); err != nil {
log.Error(fmt.Sprintf("Could not load kube Config : %s", err.Error()))
return
}
// 生成client-set配置
if k.clientSet, err = kubernetes.NewForConfig(restConf); err != nil {
log.Error(fmt.Sprintf("Could not connect k8s : %s", err.Error()))
return
}
log.Info("Successfully connected to k8s")
return
}
|
1.4 创建Dynamic ClientSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| package fleet
import (
"context"
"encoding/json"
"errors"
"fmt"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type Crd struct {
Client dynamic.Interface
}
// Connect 初始化Fleet客户端
func (i *Crd) Connect(kubeConfig []byte) (err error) {
var (
restConf *rest.Config
)
if restConf, err = clientcmd.RESTConfigFromKubeConfig(kubeConfig); err != nil {
log.Error(fmt.Sprintf("Could not load kube Config : %s", err.Error()))
return
}
if i.Client, err = dynamic.NewForConfig(restConf); err != nil {
log.Error(fmt.Sprintf("Could not connect FleetCrd-k8s : %s", err.Error()))
return
}
log.Info("Successfully connected to FleetCrd")
return
}
|
3. 根据Deployment|StatefulSet获取Pods
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| import (
"k8s.io/api/apps/v1"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
func (k *K8S) PodGetByStatefulSet(namespace string, statefulSet v1.StatefulSet) ([]*coreV1.Pod, error) {
var podList []*coreV1.Pod
labelsMap := statefulSet.ObjectMeta.GetLabels()
log.DebugF("[Middleware-K8s] StatefulSet:Labels: %#v", labelsMap)
labelSets := labels.SelectorFromSet(labelsMap)
options := metaV1.ListOptions{
LabelSelector: labelSets.String(),
}
podsClient := k.clientSet.CoreV1().Pods(namespace)
pods, err := podsClient.List(context.TODO(), options)
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
log.DebugF("[Middleware-K8s] Pod:Labels: %#v, pod_name=%s", pod.ObjectMeta.GetLabels(), pod.ObjectMeta.Name)
podList = append(podList, &pod)
}
return podList, err
}
|
4. 根据Job获取Pods
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import (
batchV1 "k8s.io/api/batch/v1"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
func (k *K8S) PodGetByJob(namespace string, job batchV1.Job) ([]*coreV1.Pod, error) {
var podList []*coreV1.Pod
labelsMap := map[string]string{
"job-name": job.ObjectMeta.Name,
"controller-uid": string(job.ObjectMeta.UID),
}
log.DebugF("[Middleware-K8s] Jobs:Labels: %#v", labelsMap)
labelSets := labels.SelectorFromSet(labelsMap)
options := metaV1.ListOptions{
LabelSelector: labelSets.String(),
}
podsClient := k.clientSet.CoreV1().Pods(namespace)
pods, err := podsClient.List(context.TODO(), options)
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
log.DebugF("[Middleware-K8s] Pod:Labels: %#v, pod_name=%s",
pod.ObjectMeta.GetLabels(),
pod.ObjectMeta.Name)
podList = append(podList, &pod)
}
return podList, err
}
|
5. Dynamic-client-go操作CRD资源
以rancher fleet项目的CRD clusters.fleet.cattle.io
举例
完整的包引用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| package fleet
import (
"context"
"encoding/json"
"errors"
"fmt"
fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
"icosdeploy/pkg/api/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
|
5.1 初始化ClientSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| package fleet
import (
"context"
"encoding/json"
"errors"
"fmt"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type Crd struct {
Client dynamic.Interface
}
// Connect 初始化Fleet客户端
func (i *Crd) Connect(kubeConfig []byte) (err error) {
var (
restConf *rest.Config
)
if restConf, err = clientcmd.RESTConfigFromKubeConfig(kubeConfig); err != nil {
log.Error(fmt.Sprintf("Could not load kube Config : %s", err.Error()))
return
}
if i.Client, err = dynamic.NewForConfig(restConf); err != nil {
log.Error(fmt.Sprintf("Could not connect FleetCrd-k8s : %s", err.Error()))
return
}
log.Info("Successfully connected to FleetCrd")
return
}
|
5.2 Cluster CRD的查询、删除操作
资源定义
1
2
3
4
5
| var clusterCRD = schema.GroupVersionResource{
Group: "fleet.cattle.io",
Version: "v1alpha1",
Resource: "clusters",
}
|
上面的值在CRD``中获取
1
2
3
4
5
6
7
8
9
10
11
12
13
| spec:
# group name to use for REST API: /apis/<group>/<version>
# 对应 Group 字段的值
group: fleet.cattle.io
# list of versions supported by this CustomResourceDefinition
versions:
# 对应 Version 字段的可选值
- name: v1alpha1
# ...
names:
# plural name to be used in the URL: /apis/<group>/<version>/<plural>
# 对应 Resource 字段的值
plural: clusters
|
5.2.1 List查询资源
如果资源对象不是Namespace隔离的,则不指定Namespace: i.Client.Resource(xxxCRD).List(context.TODO(), metav1.ListOptions{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func (i *Crd) ListClusters(namespace string) (clusters *fleet.ClusterList, err error) {
list, err := i.Client.Resource(clusterCRD).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
data, err := list.MarshalJSON()
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &clusters); err != nil {
return nil, err
}
return clusters, nil
}
|
5.2.2 Get查询资源详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| func (i *Crd) GetCluster(namespace string, name string) (cluster *fleet.Cluster, err error) {
list, err := i.Client.Resource(clusterCRD).
Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
data, err := list.MarshalJSON()
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cluster); err != nil {
return nil, err
}
return cluster, nil
}
|
5.2.3 删除资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func (i *Crd) DelCluster(namespace string, name string) (err error) {
deletePolicy := metav1.DeletePropagationForeground
cluster, err := i.GetCluster(namespace, name)
if err != nil {
log.Error(err.Error())
return err
}
if cluster == nil {
err = errors.New(fmt.Sprintf("Cluster -- %s not exists", name))
log.Error(err.Error())
return
}
err = i.Client.Resource(clusterCRD).
Namespace(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{PropagationPolicy: &deletePolicy})
if err != nil {
return
}
return
}
|
5.3 Token CRD的创建操作
资源定义
1
2
3
4
5
| var tokenCRD = schema.GroupVersionResource{
Group: "fleet.cattle.io",
Version: "v1alpha1",
Resource: "clusterregistrationtokens",
}
|
5.3.1 List查询资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func (i *Crd) ListToken(namespace string) (tokenList *fleet.ClusterRegistrationTokenList, err error) {
list, err := i.Client.Resource(tokenCRD).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
data, err := list.MarshalJSON()
if err != nil {
return nil, err
}
if err = json.Unmarshal(data, &tokenList); err != nil {
return nil, err
}
return tokenList, nil
}
|
5.3.2 Get查询资源详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (i *Crd) GetToken(namespace string, name string) (token *fleet.ClusterRegistrationToken, err error) {
list, err := i.Client.Resource(tokenCRD).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
data, err := list.MarshalJSON()
if err != nil {
return nil, err
}
if err = json.Unmarshal(data, &token); err != nil {
return nil, err
}
return token, nil
}
|
5.3.3 创建资源
注意: yaml
包一定要用 "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
, 否则想资源部分数据解析会报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| var CreateData = `
kind: ClusterRegistrationToken
apiVersion: "fleet.cattle.io/v1alpha1"
metadata:
name: new-token
namespace: fleet-local
spec:
ttl: 240h
`
func (i *Crd) CreateToken(namespace string) (token *fleet.ClusterRegistrationToken, err error) {
decoder := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme)
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{
Group: "fleet.cattle.io",
Version: "v1alpha1",
Kind: "ClusterRegistrationToken",
}
if _, _, err := decoder.Decode([]byte(CreateData), &gvk, obj); err != nil {
return nil, err
}
one, err := i.Client.Resource(tokenCRD).Namespace(namespace).Create(context.TODO(), obj, metav1.CreateOptions{})
if err != nil {
return nil, err
}
data, err := one.MarshalJSON()
if err != nil {
return nil, err
}
if err = json.Unmarshal(data, &token); err != nil {
return nil, err
}
return token, nil
}
|
5.4 GitRepo CRD的操作
资源定义
1
2
3
4
5
| var gitRepoCRD = schema.GroupVersionResource{
Group: "fleet.cattle.io",
Version: "v1alpha1",
Resource: "gitrepos",
}
|
5.4.1 List查询资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (i *Crd) ListGitRepo(namespace string) (repoList *fleet.GitRepoList, err error) {
list, err := i.Client.Resource(gitRepoCRD).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
data, err := list.MarshalJSON()
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &repoList); err != nil {
return nil, err
}
return repoList, nil
}
|
6. 查看容器日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // PodLogs - pod logs
func (k *K8S) PodLogs(namespace string, name string) (string, error) {
podLogOpts := coreV1.PodLogOptions{}
jobsClient := k.clientSet.CoreV1().Pods(namespace)
result := jobsClient.GetLogs(name, &podLogOpts)
podLogs, err := result.Stream(context.TODO())
if err != nil {
log.ErrorF("error in opening stream: %v", err)
return "", err
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
log.ErrorF("error in copy information from podLogs to buf: %v", err)
return "", err
}
logs := buf.String()
return logs, err
}
|
Reference
第一参考 client-go简介
client-go官方实例–集群内client配置
client-go针对crd资源,代码生成器
Unit test kubernetes client in Go
Dynamic-client-go操作CRD资源
基于Dynamic-client, 开发第三方资源Informer和Controller