本文通过K8S的exec websocket接口结合tar将文件树压缩为数据流以及解压还原, 实现两个Pod间拷贝文件的功能.
需求
想想咱们遇到以下问题一般怎么解决?
新建了一个Pod, 想把另外一个Pod中的文件拷贝到新Pod中进行分析, 怎么实现呢?
如何在项目中, 如何像kubectl cp拷贝文件一样, 实现Pod间文件拷贝呢?
新Pod与实例Pod共享pvc? 或者封装一个带认证上下文的kubectl执行命令行?
简介
本文通过K8S的Exec Websocket接口结合tar将文件树压缩为数据流以及解压还原, 实现两个Pod间拷贝文件的功能.
关于exec接口请参考:https://www.cnblogs.com/a00ium/p/10905279.html
请看图
流程说明
- 首先初始化信号通道, 用于协程间的信号通知, 收到信号的协程执行暂停/退出循环/关闭通道等操作
- 初始化数据通道srcStdOutCh, 类型为字节数组[]byte, 用于将源Pod的标准输出放入通道, 发送给目的Pod标准输入的数据就是从该数据通道中读取
- 拼接exec接口的访问地址(集群连接,token), tar压缩命令, 标准输入/输出,tty, pod名,容器名等参数. tar czf - /var/log/xxx.log 表示将该文件树结构压缩为数据流
- 调用websocket的Dialer方法与源Pod容器建立websocket连接, 并开启协程将标准输出写入数据通道srcStdOutCh
- 参考源pod exec接口, 拼接目的Pod exec访问连接, tar xzf - -C /tmp表示从标准输入读取数据流, 并解压成文件树结构(注意:解压后包含文件目录树结构)
- 与目的Pod建立wss连接, 开启协程从数据通道srcStdOutCh中读取源Pod标准输出, 并写入目的Pod的标准输入, 如果从数据通道读取超时,则表示数据已经传输完毕, 此时停止向目的容器输入数据, 并发送通知信号, 通知主协程可以退出,关闭源Pod的wss连接
注意事项
- wesocket连上源Pod时, 标准输出中会输出空数据, tar命令输出等干扰数据, 所以接收数据的时候需要传入一个过滤器回调函数, 用于数据过滤
- 向目的容器发送数据时, 需要将源容器收到的第一个字节删除, 一般为1, 表示标准输出标识, 发送给目的容器是不需要该字节的
- 发送数据时, 需要设置第一个字节为0, 表示发送到标准输入
参考代码
cp.go
- /*
- 总结:1.不带缓冲的通道需要先读后写2.websocketReadMessage方法是阻塞读取的,如果要中断读取,关闭连接,捕获错误即可
- */
- packagecpFilePod2Pod
- import(
- "crypto/tls"
- "errors"
- "fmt"
- "log"
- "net/url"
- "regexp"
- "strings"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- )
- //定义过滤器回调函数
- typefilterCallbackfunc(inputstring)bool
- //带有互斥锁的Websocket连接对象
- typeWsConnstruct{
- Conn*websocket.Conn
- musync.Mutex
- }
- //发送字符串,自动添加换行符
- func(self*WsConn)Send(senderstring,strstring){
- self.mu.Lock()
- deferself.mu.Unlock()
- //利用k8sexecwebsocket接口发送数据时,第一个字节需要设置为0,表示将数据发送到标准输入
- data:=[]byte{0}
- data=append(data,[]byte(str+"\n")...)
- err:=self.Conn.WriteMessage(websocket.BinaryMessage,data)//发送二进制数据类型
- iferr!=nil{
- log.Printf("发送错误,%s",err.Error())
- }
- log.Printf("%s,数据:%s,字节:%+v",sender,str,[]byte(str+"\n"))
- }
- //发送字符串,不添加换行符,内部做字节过滤,等操作
- func(self*WsConn)SendWithFilter(senderstring,strstring){
- self.mu.Lock()
- deferself.mu.Unlock()
- //log.Printf("向目的容器发送数据:%s",str)
- str=strings.ReplaceAll(str,"\r\n","\n")///r=13,/n=10,windows换行符转Linux换行符
- //去掉第一个字节(标准输出1,byte:[01...]),因为从源容器输出的字节中,第一位标识了标准输出1,给目的容器发送字节时,需要去除该标志
- //当WebSocket建立连接后,发送数据时需要在字节Buffer第一个字节设置为stdin(buf[0]=0),而接受数据时,需要判断第一个字节,stdout(buf[0]=1)或stderr(buf[0]=2)
- strByte:=append([]byte(str)[:0],[]byte(str)[1:]...)
- data:=[]byte{0}
- data=append(data,strByte...)
- err:=self.Conn.WriteMessage(websocket.BinaryMessage,data)
- log.Printf("向目的容器标准输入发送数据:\n%s,字节数:%d,字节:%+v",string(data),len(data),data)
- iferr!=nil{
- log.Printf("发送错误,%s",err.Error())
- }
- }
- //从连接中获取数据流,并写入字节数组通道中,内部执行过滤器(回调函数)
- func(self*WsConn)Receive(receiverstring,chchan[]byte,filterfilterCallback)error{
- self.mu.Lock()
- deferself.mu.Unlock()
- msgType,msgByte,err:=self.Conn.ReadMessage()//阻塞读取,类型为2表示二进制数据,1表示文本,-1表示连接已关闭:websocket:close1000(normal)
- log.Printf("%s,读取到数据:%s,类型:%d,字节数:%d,字节:%+v",receiver,string(msgByte),msgType,len(msgByte),msgByte)
- iferr!=nil{
- log.Printf("%s,读取出错,%s",receiver,err.Error())
- returnerr
- }
- iffilter(string(msgByte))&&len(msgByte)>1{
- ch<-msgByte
- }else{
- log.Printf("%s,数据不满足,直接丢弃数据,字符:%s,字节数:%d,字节:%v",receiver,string(msgByte),len(msgByte),msgByte)
- }
- returnnil
- }
- funcNewWsConn(hoststring,pathstring,paramsmap[string]string,headersmap[string][]string)(*websocket.Conn,error){
- paramArray:=[]string{}
- fork,v:=rangeparams{
- paramArray=append(paramArray,fmt.Sprintf("%s=%s",k,v))
- }
- u:=url.URL{Scheme:"wss",Host:host,Path:path,RawQuery:strings.Join(paramArray,"&")}
- log.Printf("API:%s",u.String())
- dialer:=websocket.Dialer{TLSClientConfig:&tls.Config{RootCAs:nil,InsecureSkipVerify:true}}
- conn,_,err:=dialer.Dial(u.String(),headers)
- iferr!=nil{
- returnnil,errors.New(fmt.Sprintf("连接错误:%s",err.Error()))
- }
- returnconn,nil
- }
- //核心:tar-cf-将具有文件夹结构的数据转换成数据流,通过tar-xf-将数据流转换成linux文件系统
- funcCpPod2Pod(){
- //通知主函数可以退出的信号通道
- signalExit:=make(chanbool,1)
- deferclose(signalExit)
- //下发不要给目的容器发送数据的信号
- signalStopDstSend:=make(chanbool,1)
- deferclose(signalStopDstSend)
- //下发不要从源容器读取数据的信号
- signalStopSrcRead:=make(chanbool,1)
- deferclose(signalStopSrcRead)
- //下发不要从目的容器读取数据的信号
- signalStopDstRead:=make(chanbool,1)
- deferclose(signalStopDstRead)
- //下发不要打印目的容器的输出数据
- signalStopPrintDstStdout:=make(chanbool,1)
- deferclose(signalStopPrintDstStdout)
- //连接pod
- host:="172.16.xxx.xxx:6443"
- token:="xxx"
- headers:=map[string][]string{"authorization":{fmt.Sprintf("Bearer%s",token)}}
- pathSrc:="/api/v1/namespaces/xxx/pods/xxx/exec"
- commandSrc:="tar&command=czf&command=-&command=/var/log/mysql/slow.log"//tarczf-sourceFile
- paraSrc:=map[string]string{"stdout":"1","stdin":"0","stderr":"1","tty":"0","container":"xxx","command":commandSrc}
- srcConn,err:=NewWsConn(host,pathSrc,paraSrc,headers)
- iferr!=nil{
- log.Printf("源Pod连接出错,%s",err.Error())
- }
- pathDst:="/api/v1/namespaces/xxx/pods/xxx/exec"
- commandDst:="tar&command=xzf&command=-&command=-C&command=/tmp"//tarxzf--C/tmp
- //paraDst:=map[string]string{"stdout":"1","stdin":"1","stderr":"1","tty":"0","container":"xxx","command":commandDst}
- paraDst:=map[string]string{"stdout":"0","stdin":"1","stderr":"0","tty":"0","container":"xxx","command":commandDst}//关闭目的Pod标准输出和错误输出
- dstConn,err:=NewWsConn(host,pathDst,paraDst,headers)
- iferr!=nil{
- log.Printf("目的Pod连接出错,%s",err.Error())
- }
- wsSrc:=WsConn{
- Conn:srcConn,
- }
- wsDst:=WsConn{
- Conn:dstConn,
- }
- defersrcConn.Close()
- deferdstConn.Close()
- srcStdOutCh:=make(chan[]byte,2048)
- dstStdOutCh:=make(chan[]byte)
- deferclose(srcStdOutCh)
- deferclose(dstStdOutCh)
- //接收源容器标准输出到数据通道中
- gofunc(){
- i:=1
- for{
- log.Printf("第%d次,从源容器读取标准输出",i)
- i++
- //定义匿名过滤器回调方法,对源容器标准输出中不需要的数据进行过滤
- err:=wsSrc.Receive("源容器",srcStdOutCh,func(inputstring)bool{
- ifinput=="cat/var/log/mysql/slow.log"{
- returnfalse
- //}elseifmatch,_:=regexp.MatchString("root@(.+)#",input);match{
- //returnfalse
- //}elseifmatch,_:=regexp.MatchString("cat/(.+).log",input);match{
- //returnfalse
- //}elseifmatch,_:=regexp.MatchString("cat/tmp/(.+)",input);match{
- //returnfalse
- }elseifmatch,_:=regexp.MatchString("tar:Removingleading(.+)",input);match{
- returnfalse
- }elseiflen(input)==0{//过滤空消息
- //log.Printf("读取到标准错误输出")
- returnfalse
- }
- returntrue
- })
- iferr!=nil{
- log.Printf("读取源容器标准输出失败")
- //signalExit<-true
- break
- }
- //time.Sleep(time.Microsecond*100)
- }
- }()
- /*注意,这里不能开启并发协程去读取目的容器的标准输出,如果开启可能会与发送数据的协程抢锁,从而阻塞向目的容器发送数据*/
- ////从目的容器获取标准输出到数据通道中
- //gofunc(){
- ////i:=0
- //for{
- ////该过滤器直接返回true,仅占位
- //err:=wsDst.Receive("目的容器",dstStdOutCh,func(inputstring)bool{
- //returntrue
- //})
- //iferr!=nil{
- //log.Printf("从目的容器读取数据失败")
- //break
- //}
- ////wsDst.Send()
- //time.Sleep(time.Microsecond*100000)
- //}
- ////log.Printf("从目的容器读取数据,第%d次循环",i)
- ////i++
- //}()
- ////从数据通道中读取,目的容器的标准输出,并打印
- //gofunc(){
- //BreakPrintDstPodStdout:
- //for{
- //select{
- //casedata:=<-dstStdOutCh:
- //log.Printf("目的容器标准输出:%s",string(data))
- ////time.Sleep(time.Microsecond*200)
- //case<-signalStopPrintDstStdout:
- //log.Printf("收到信号,停止打印目的容器标准输出")
- ////close(dataOutput)
- ////close(dataCh)
- ////signalStopRead<-true
- ////log.Printf("发送停止读信号")
- ////close(dataOutput)
- ////close(dataCh)
- //breakBreakPrintDstPodStdout
- //}
- ////time.Sleep(time.Microsecond*100)
- //}
- //}()
- //从源容器标准输出的数据通道获取数据,然后发送给目的容器标准输入
- //定义超时时间
- timeOutSecond:=3
- timer:=time.NewTimer(time.Second*time.Duration(timeOutSecond))
- Break2Main:
- for{
- select{
- casedata:=<-srcStdOutCh:
- wsDst.SendWithFilter("向目的容器发送",string(data))
- //time.Sleep(time.Millisecond*200)
- timer.Reset(time.Second*time.Duration(timeOutSecond))
- case<-timer.C:
- //time.Sleep(time.Second*5)
- log.Printf("================源容器标准输出,没有新的数据,获取超时,停止向目的容器发送数据================")
- //log.Printf("发送信号:停止打印目的容器标准输出")
- //signalStopPrintDstStdout<-true
- log.Printf("发送信号:停止从源容器读取数据")
- wsSrc.Conn.Close()
- //log.Printf("发送信号:停止从目的容器读取数据")
- //wsDst.Conn.Close()
- log.Printf("发送信号:主函数可以退出了")
- signalExit<-true
- log.Printf("所有信号发送完毕")
- log.Printf("==================跳出循环=================")
- breakBreak2Main
- }
- //time.Sleep(time.Microsecond*1000)
- }
- //signalStopRead<-true
- <-signalExit//阻塞通道,直到收到一个信号
- //signalStopRead<-true
- log.Printf("主函数收到信号,准备退出")
- //close(dataCh)
- //time.Sleep(time.Second)
- //close(dataOutput)
- //time.Sleep(time.Second)
- //select{}
- }
cp_test.go
- packagecpFilePod2Pod
- import(
- "log"
- "testing"
- )
- //gotest-race-test.runTestCpPod2Pod切到该目录执行该测试
- funcTestCpPod2Pod(t*testing.T){
- log.Printf("开始测试")
- CpPod2Pod()
- }
- 参考结果:
- 源容器:
- root@xxx-mysql-0:/var/log/mysql#md5sumslow.log
- 16577613b6ea957ecb5d9d5e976d9c50slow.log
- 目的容器:
- root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql#md5sumslow.log
- 16577613b6ea957ecb5d9d5e976d9c50slow.log
参考文档
Kubernetes exec API串接分析:https://www.cnblogs.com/a00ium/p/10905279.html
kubernetes-client-go-实现-kubectl-copy:https://ica10888.com/2019/08/31/kubernetes-client-go-%E5%AE%9E%E7%8E%B0-kubectl-copy.html
转载请说明出处
知优网 » K8S-利用Exec Websocket接口实现Pod间的文件拷贝
知优网 » K8S-利用Exec Websocket接口实现Pod间的文件拷贝