本文通过K8S的exec websocket接口结合tar将文件树压缩为数据流以及解压还原, 实现两个Pod间拷贝文件的功能.

 K8S-利用Exec Websocket接口实现Pod间的文件拷贝 K8S Exec Websocket 第1张

需求

想想咱们遇到以下问题一般怎么解决?

新建了一个Pod, 想把另外一个Pod中的文件拷贝到新Pod中进行分析, 怎么实现呢?

如何在项目中, 如何像kubectl cp拷贝文件一样, 实现Pod间文件拷贝呢?

新Pod与实例Pod共享pvc? 或者封装一个带认证上下文的kubectl执行命令行?

简介

本文通过K8S的Exec Websocket接口结合tar将文件树压缩为数据流以及解压还原, 实现两个Pod间拷贝文件的功能.

关于exec接口请参考:https://www.cnblogs.com/a00ium/p/10905279.html

请看图

 K8S-利用Exec Websocket接口实现Pod间的文件拷贝 K8S Exec Websocket 第2张

流程说明

  • 首先初始化信号通道, 用于协程间的信号通知, 收到信号的协程执行暂停/退出循环/关闭通道等操作
  • 初始化数据通道srcStdOutCh, 类型为字节数组[]byte, 用于将源Pod的标准输出放入通道, 发送给目的Pod标准输入的数据就是从该数据通道中读取
  • 拼接exec接口的访问地址(集群连接,token), tar压缩命令, 标准输入/输出,tty, pod名,容器名等参数. tar czf - /var/log/xxx.log 表示将该文件树结构压缩为数据流
  • 调用websocket的Dialer方法与源Pod容器建立websocket连接, 并开启协程将标准输出写入数据通道srcStdOutCh
  • 参考源pod exec接口, 拼接目的Pod exec访问连接, tar xzf - -C /tmp表示从标准输入读取数据流, 并解压成文件树结构(注意:解压后包含文件目录树结构)
  • 与目的Pod建立wss连接, 开启协程从数据通道srcStdOutCh中读取源Pod标准输出, 并写入目的Pod的标准输入, 如果从数据通道读取超时,则表示数据已经传输完毕, 此时停止向目的容器输入数据, 并发送通知信号, 通知主协程可以退出,关闭源Pod的wss连接

注意事项

  • wesocket连上源Pod时, 标准输出中会输出空数据, tar命令输出等干扰数据, 所以接收数据的时候需要传入一个过滤器回调函数, 用于数据过滤
  • 向目的容器发送数据时, 需要将源容器收到的第一个字节删除, 一般为1, 表示标准输出标识, 发送给目的容器是不需要该字节的
  • 发送数据时, 需要设置第一个字节为0, 表示发送到标准输入

参考代码

cp.go

  1. /*
  2. 总结:1.不带缓冲的通道需要先读后写2.websocketReadMessage方法是阻塞读取的,如果要中断读取,关闭连接,捕获错误即可
  3. */
  4. packagecpFilePod2Pod
  5. import(
  6. "crypto/tls"
  7. "errors"
  8. "fmt"
  9. "log"
  10. "net/url"
  11. "regexp"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. )
  17. //定义过滤器回调函数
  18. typefilterCallbackfunc(inputstring)bool
  19. //带有互斥锁的Websocket连接对象
  20. typeWsConnstruct{
  21. Conn*websocket.Conn
  22. musync.Mutex
  23. }
  24. //发送字符串,自动添加换行符
  25. func(self*WsConn)Send(senderstring,strstring){
  26. self.mu.Lock()
  27. deferself.mu.Unlock()
  28. //利用k8sexecwebsocket接口发送数据时,第一个字节需要设置为0,表示将数据发送到标准输入
  29. data:=[]byte{0}
  30. data=append(data,[]byte(str+"\n")...)
  31. err:=self.Conn.WriteMessage(websocket.BinaryMessage,data)//发送二进制数据类型
  32. iferr!=nil{
  33. log.Printf("发送错误,%s",err.Error())
  34. }
  35. log.Printf("%s,数据:%s,字节:%+v",sender,str,[]byte(str+"\n"))
  36. }
  37. //发送字符串,不添加换行符,内部做字节过滤,等操作
  38. func(self*WsConn)SendWithFilter(senderstring,strstring){
  39. self.mu.Lock()
  40. deferself.mu.Unlock()
  41. //log.Printf("向目的容器发送数据:%s",str)
  42. str=strings.ReplaceAll(str,"\r\n","\n")///r=13,/n=10,windows换行符转Linux换行符
  43. //去掉第一个字节(标准输出1,byte:[01...]),因为从源容器输出的字节中,第一位标识了标准输出1,给目的容器发送字节时,需要去除该标志
  44. //当WebSocket建立连接后,发送数据时需要在字节Buffer第一个字节设置为stdin(buf[0]=0),而接受数据时,需要判断第一个字节,stdout(buf[0]=1)或stderr(buf[0]=2)
  45. strByte:=append([]byte(str)[:0],[]byte(str)[1:]...)
  46. data:=[]byte{0}
  47. data=append(data,strByte...)
  48. err:=self.Conn.WriteMessage(websocket.BinaryMessage,data)
  49. log.Printf("向目的容器标准输入发送数据:\n%s,字节数:%d,字节:%+v",string(data),len(data),data)
  50. iferr!=nil{
  51. log.Printf("发送错误,%s",err.Error())
  52. }
  53. }
  54. //从连接中获取数据流,并写入字节数组通道中,内部执行过滤器(回调函数)
  55. func(self*WsConn)Receive(receiverstring,chchan[]byte,filterfilterCallback)error{
  56. self.mu.Lock()
  57. deferself.mu.Unlock()
  58. msgType,msgByte,err:=self.Conn.ReadMessage()//阻塞读取,类型为2表示二进制数据,1表示文本,-1表示连接已关闭:websocket:close1000(normal)
  59. log.Printf("%s,读取到数据:%s,类型:%d,字节数:%d,字节:%+v",receiver,string(msgByte),msgType,len(msgByte),msgByte)
  60. iferr!=nil{
  61. log.Printf("%s,读取出错,%s",receiver,err.Error())
  62. returnerr
  63. }
  64. iffilter(string(msgByte))&&len(msgByte)>1{
  65. ch<-msgByte
  66. }else{
  67. log.Printf("%s,数据不满足,直接丢弃数据,字符:%s,字节数:%d,字节:%v",receiver,string(msgByte),len(msgByte),msgByte)
  68. }
  69. returnnil
  70. }
  71. funcNewWsConn(hoststring,pathstring,paramsmap[string]string,headersmap[string][]string)(*websocket.Conn,error){
  72. paramArray:=[]string{}
  73. fork,v:=rangeparams{
  74. paramArray=append(paramArray,fmt.Sprintf("%s=%s",k,v))
  75. }
  76. u:=url.URL{Scheme:"wss",Host:host,Path:path,RawQuery:strings.Join(paramArray,"&")}
  77. log.Printf("API:%s",u.String())
  78. dialer:=websocket.Dialer{TLSClientConfig:&tls.Config{RootCAs:nil,InsecureSkipVerify:true}}
  79. conn,_,err:=dialer.Dial(u.String(),headers)
  80. iferr!=nil{
  81. returnnil,errors.New(fmt.Sprintf("连接错误:%s",err.Error()))
  82. }
  83. returnconn,nil
  84. }
  85. //核心:tar-cf-将具有文件夹结构的数据转换成数据流,通过tar-xf-将数据流转换成linux文件系统
  86. funcCpPod2Pod(){
  87. //通知主函数可以退出的信号通道
  88. signalExit:=make(chanbool,1)
  89. deferclose(signalExit)
  90. //下发不要给目的容器发送数据的信号
  91. signalStopDstSend:=make(chanbool,1)
  92. deferclose(signalStopDstSend)
  93. //下发不要从源容器读取数据的信号
  94. signalStopSrcRead:=make(chanbool,1)
  95. deferclose(signalStopSrcRead)
  96. //下发不要从目的容器读取数据的信号
  97. signalStopDstRead:=make(chanbool,1)
  98. deferclose(signalStopDstRead)
  99. //下发不要打印目的容器的输出数据
  100. signalStopPrintDstStdout:=make(chanbool,1)
  101. deferclose(signalStopPrintDstStdout)
  102. //连接pod
  103. host:="172.16.xxx.xxx:6443"
  104. token:="xxx"
  105. headers:=map[string][]string{"authorization":{fmt.Sprintf("Bearer%s",token)}}
  106. pathSrc:="/api/v1/namespaces/xxx/pods/xxx/exec"
  107. commandSrc:="tar&command=czf&command=-&command=/var/log/mysql/slow.log"//tarczf-sourceFile
  108. paraSrc:=map[string]string{"stdout":"1","stdin":"0","stderr":"1","tty":"0","container":"xxx","command":commandSrc}
  109. srcConn,err:=NewWsConn(host,pathSrc,paraSrc,headers)
  110. iferr!=nil{
  111. log.Printf("源Pod连接出错,%s",err.Error())
  112. }
  113. pathDst:="/api/v1/namespaces/xxx/pods/xxx/exec"
  114. commandDst:="tar&command=xzf&command=-&command=-C&command=/tmp"//tarxzf--C/tmp
  115. //paraDst:=map[string]string{"stdout":"1","stdin":"1","stderr":"1","tty":"0","container":"xxx","command":commandDst}
  116. paraDst:=map[string]string{"stdout":"0","stdin":"1","stderr":"0","tty":"0","container":"xxx","command":commandDst}//关闭目的Pod标准输出和错误输出
  117. dstConn,err:=NewWsConn(host,pathDst,paraDst,headers)
  118. iferr!=nil{
  119. log.Printf("目的Pod连接出错,%s",err.Error())
  120. }
  121. wsSrc:=WsConn{
  122. Conn:srcConn,
  123. }
  124. wsDst:=WsConn{
  125. Conn:dstConn,
  126. }
  127. defersrcConn.Close()
  128. deferdstConn.Close()
  129. srcStdOutCh:=make(chan[]byte,2048)
  130. dstStdOutCh:=make(chan[]byte)
  131. deferclose(srcStdOutCh)
  132. deferclose(dstStdOutCh)
  133. //接收源容器标准输出到数据通道中
  134. gofunc(){
  135. i:=1
  136. for{
  137. log.Printf("第%d次,从源容器读取标准输出",i)
  138. i++
  139. //定义匿名过滤器回调方法,对源容器标准输出中不需要的数据进行过滤
  140. err:=wsSrc.Receive("源容器",srcStdOutCh,func(inputstring)bool{
  141. ifinput=="cat/var/log/mysql/slow.log"{
  142. returnfalse
  143. //}elseifmatch,_:=regexp.MatchString("root@(.+)#",input);match{
  144. //returnfalse
  145. //}elseifmatch,_:=regexp.MatchString("cat/(.+).log",input);match{
  146. //returnfalse
  147. //}elseifmatch,_:=regexp.MatchString("cat/tmp/(.+)",input);match{
  148. //returnfalse
  149. }elseifmatch,_:=regexp.MatchString("tar:Removingleading(.+)",input);match{
  150. returnfalse
  151. }elseiflen(input)==0{//过滤空消息
  152. //log.Printf("读取到标准错误输出")
  153. returnfalse
  154. }
  155. returntrue
  156. })
  157. iferr!=nil{
  158. log.Printf("读取源容器标准输出失败")
  159. //signalExit<-true
  160. break
  161. }
  162. //time.Sleep(time.Microsecond*100)
  163. }
  164. }()
  165. /*注意,这里不能开启并发协程去读取目的容器的标准输出,如果开启可能会与发送数据的协程抢锁,从而阻塞向目的容器发送数据*/
  166. ////从目的容器获取标准输出到数据通道中
  167. //gofunc(){
  168. ////i:=0
  169. //for{
  170. ////该过滤器直接返回true,仅占位
  171. //err:=wsDst.Receive("目的容器",dstStdOutCh,func(inputstring)bool{
  172. //returntrue
  173. //})
  174. //iferr!=nil{
  175. //log.Printf("从目的容器读取数据失败")
  176. //break
  177. //}
  178. ////wsDst.Send()
  179. //time.Sleep(time.Microsecond*100000)
  180. //}
  181. ////log.Printf("从目的容器读取数据,第%d次循环",i)
  182. ////i++
  183. //}()
  184. ////从数据通道中读取,目的容器的标准输出,并打印
  185. //gofunc(){
  186. //BreakPrintDstPodStdout:
  187. //for{
  188. //select{
  189. //casedata:=<-dstStdOutCh:
  190. //log.Printf("目的容器标准输出:%s",string(data))
  191. ////time.Sleep(time.Microsecond*200)
  192. //case<-signalStopPrintDstStdout:
  193. //log.Printf("收到信号,停止打印目的容器标准输出")
  194. ////close(dataOutput)
  195. ////close(dataCh)
  196. ////signalStopRead<-true
  197. ////log.Printf("发送停止读信号")
  198. ////close(dataOutput)
  199. ////close(dataCh)
  200. //breakBreakPrintDstPodStdout
  201. //}
  202. ////time.Sleep(time.Microsecond*100)
  203. //}
  204. //}()
  205. //从源容器标准输出的数据通道获取数据,然后发送给目的容器标准输入
  206. //定义超时时间
  207. timeOutSecond:=3
  208. timer:=time.NewTimer(time.Second*time.Duration(timeOutSecond))
  209. Break2Main:
  210. for{
  211. select{
  212. casedata:=<-srcStdOutCh:
  213. wsDst.SendWithFilter("向目的容器发送",string(data))
  214. //time.Sleep(time.Millisecond*200)
  215. timer.Reset(time.Second*time.Duration(timeOutSecond))
  216. case<-timer.C:
  217. //time.Sleep(time.Second*5)
  218. log.Printf("================源容器标准输出,没有新的数据,获取超时,停止向目的容器发送数据================")
  219. //log.Printf("发送信号:停止打印目的容器标准输出")
  220. //signalStopPrintDstStdout<-true
  221. log.Printf("发送信号:停止从源容器读取数据")
  222. wsSrc.Conn.Close()
  223. //log.Printf("发送信号:停止从目的容器读取数据")
  224. //wsDst.Conn.Close()
  225. log.Printf("发送信号:主函数可以退出了")
  226. signalExit<-true
  227. log.Printf("所有信号发送完毕")
  228. log.Printf("==================跳出循环=================")
  229. breakBreak2Main
  230. }
  231. //time.Sleep(time.Microsecond*1000)
  232. }
  233. //signalStopRead<-true
  234. <-signalExit//阻塞通道,直到收到一个信号
  235. //signalStopRead<-true
  236. log.Printf("主函数收到信号,准备退出")
  237. //close(dataCh)
  238. //time.Sleep(time.Second)
  239. //close(dataOutput)
  240. //time.Sleep(time.Second)
  241. //select{}
  242. }

cp_test.go

  1. packagecpFilePod2Pod
  2. import(
  3. "log"
  4. "testing"
  5. )
  6. //gotest-race-test.runTestCpPod2Pod切到该目录执行该测试
  7. funcTestCpPod2Pod(t*testing.T){
  8. log.Printf("开始测试")
  9. CpPod2Pod()
  10. }

  1. 参考结果:
  2. 源容器:
  3. root@xxx-mysql-0:/var/log/mysql#md5sumslow.log
  4. 16577613b6ea957ecb5d9d5e976d9c50slow.log
  5. 目的容器:
  6. root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql#md5sumslow.log
  7. 16577613b6ea957ecb5d9d5e976d9c50slow.log

参考文档

Kubernetes exec API串接分析:https://www.cnblogs.com/a00ium/p/10905279.html

kubernetes-client-go-实现-kubectl-copy:https://ica10888.com/2019/08/31/kubernetes-client-go-%E5%AE%9E%E7%8E%B0-kubectl-copy.html

 K8S-利用Exec Websocket接口实现Pod间的文件拷贝 K8S Exec Websocket 第3张

转载请说明出处
知优网 » K8S-利用Exec Websocket接口实现Pod间的文件拷贝

发表评论

您需要后才能发表评论