ctr attach流程

news/2025/2/27 11:20:14/文章来源:https://www.cnblogs.com/rincloud/p/18740583

      cio.WithStdio 即为opt 因为 func NewAttach(opts ...Opt)

       task, err := container.Task(ctx, cio.NewAttach(cio.WithStdio))

       if err != nil {
           return err
       }

 

 

 

1. cio.WithStdio 实现 即attch中的opt实现细节。

 

type Opt func(*Streams)

 

// WithStdio sets stream options to the standard input/output streams
func WithStdio(opt *Streams) {
   WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)

}

 

//把标准输入 输出 标准错误放到stream里 返回opt 这里opt 函数暂时未赋值

// WithStreams sets the stream options to the specified Reader and Writers
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
   return func(opt *Streams) {
       opt.Stdin = stdin
       opt.Stdout = stdout
       opt.Stderr = stderr
   }

}

 

 

 

 

2. NewAttach 实现

type Attach func(*FIFOSet) (IO, error)

 

// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
func NewAttach(opts ...Opt) Attach {

   streams := &Streams{}

   //执行opt函数

   for _, opt := range opts {

    // 等价于

    // strean.Stdin = os.stdin
    // strean.Stdout = os.stdout
    // strean.Stderr = os.stderr



       opt(streams)
   }
   return func(fifos *FIFOSet) (IO, error) {
       if fifos == nil {
           return nil, fmt.Errorf("cannot attach, missing fifos")
       }
       if streams.Stdin == nil {
           fifos.Stdin = ""
       }
       if streams.Stdout == nil {
           fifos.Stdout = ""
       }
       if streams.Stderr == nil {
           fifos.Stderr = ""

       }

        //copy stream(当前的标准输入 输出 )流到 fifos

       return copyIO(fifos, streams)
   }

}

 

 

3. containerd Task实现

func (c *container) Task(ctx context.Context, attach cio.Attach) (Task, error) {
   return c.loadTask(ctx, attach)
}

 

func (c *container) loadTask(ctx context.Context, ioAttach cio.Attach) (Task, error) {
   response, err := c.client.TaskService().Get(ctx, &tasks.GetRequest{
       ContainerID: c.id,
   })
   if err != nil {
       err = errdefs.FromGRPC(err)
       if errdefs.IsNotFound(err) {
           return nil, fmt.Errorf("no running task found: %w", err)
       }
       return nil, err
   }
   var i cio.IO
   if ioAttach != nil && response.Process.Status != tasktypes.Status_UNKNOWN {
       // Do not attach IO for task in unknown state, because there
       // are no fifo paths anyway.
       if i, err = attachExistingIO(response, ioAttach); err != nil {
           return nil, err
       }
   }
   t := &task{
       client: c.client,
       io:     i,
       id:     response.Process.ID,
       pid:    response.Process.Pid,
       c:      c,
   }
   return t, nil
}

 

其中attachExistingIO 实现  

ioAttach 就是cio.NewAttach(cio.WithStdio)

// get the existing fifo paths from the task information stored by the daemon
func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) {

   //获取容器process 的fifo

    fifoSet := loadFifos(response)

    //执行NewAttach 包含iocopy

   return ioAttach(fifoSet)
}

 

// loadFifos loads the containers fifos
func loadFifos(response *tasks.GetResponse) *cio.FIFOSet {
   fifos := []string{
       response.Process.Stdin,
       response.Process.Stdout,
       response.Process.Stderr,
   }
   closer := func() error {
       var (
           err  error
           dirs = map[string]struct{}{}
       )
       for _, f := range fifos {
           if isFifo, _ := fifo.IsFifo(f); isFifo {
               if rerr := os.Remove(f); err == nil {
                   err = rerr
               }
               dirs[filepath.Dir(f)] = struct{}{}
           }
       }
       for dir := range dirs {
           // we ignore errors here because we don't
           // want to remove the directory if it isn't
           // empty
           os.Remove(dir)
       }
       return err
   }

   return cio.NewFIFOSet(cio.Config{
       Stdin:    response.Process.Stdin,
       Stdout:   response.Process.Stdout,
       Stderr:   response.Process.Stderr,
       Terminal: response.Process.Terminal,
   }, closer)

}

 

 

 

 

container.Task的最后流程是在进行copyIO

 

实现如下

func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
   var ctx, cancel = context.WithCancel(context.Background())
   pipes, err := openFifos(ctx, fifos)
   if err != nil {
       cancel()
       return nil, err
   }

   if fifos.Stdin != "" {
       go func() {
           p := bufPool.Get().(*[]byte)
           defer bufPool.Put(p)
            //把 控制台的标准输入扔到到容器的标准输入
           io.CopyBuffer(pipes.Stdin, ioset.Stdin, *p)
           pipes.Stdin.Close()
       }()
   }

   var wg = &sync.WaitGroup{}
   if fifos.Stdout != "" {
       wg.Add(1)
       go func() {
           p := bufPool.Get().(*[]byte)
           defer bufPool.Put(p)
            //容器的pipe的标准输出发送到 控制台的标准输出
           io.CopyBuffer(ioset.Stdout, pipes.Stdout, *p)
           pipes.Stdout.Close()
           wg.Done()
       }()
   }

   if !fifos.Terminal && fifos.Stderr != "" {
       wg.Add(1)
       go func() {
           p := bufPool.Get().(*[]byte)
           defer bufPool.Put(p)
            //容器的pipe的标准错误发送到 控制台的标准错误
           io.CopyBuffer(ioset.Stderr, pipes.Stderr, *p)
           pipes.Stderr.Close()
           wg.Done()
       }()
   }
   return &cio{
       config:  fifos.Config,
       wg:      wg,
       closers: append(pipes.closers(), fifos),
       cancel: func() {
           cancel()
           for _, c := range pipes.closers() {
               if c != nil {
                   c.Close()
               }
           }
       },
   }, nil
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/890545.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第02章 JDBC的新增修改删除

JDBC编程六步 JDBC编程的步骤是很固定的,通常包含以下六步:第一步:注册驱动作用一:将 JDBC 驱动程序从硬盘上的文件系统中加载到内存中。 作用二:使得 DriverManager 可以通过一个统一的接口来管理该驱动程序的所有连接操作。第二步:获取数据库连接获取java.sql.Connecti…

第01章 JDBC概述

什么是JDBC JDBC(Java DataBase Connectivity)就是Java数据库连接,说白了就是用Java语言来操作数据库。原来我们操作数据库是在控制台使用SQL语句来操作数据库,JDBC是用Java语言向数据库发送SQL语句。‍ JDBC原理 早期SUN公司的天才们想编写一套可以连接天下所有数据库的AP…

数组模拟单链表

题目代码 #include <iostream> #include <algorithm> #include <cstring> using namespace std; const int N = 100010; int head, idx, e[N], ne[N]; // 两个值,两个数组// head:第一个节点的下标(表示头结点的下标) // idx:已经存储了几个数(到了第几个数…

ES-8.17.2版本集群搭建

前提工作准备  Linux 3台 16GB运行内存 8核 50GB磁盘  JDK17 环境配置  elasticsearch-8.17.2-linux-x86_64.tar.gz 安装包2.集群规划  在 ES 集群中,不同节点可承担不同角色: **主节点(Master Node)**:负责集群管理、节点选举、索引元数据管理。建议至少配置 3 个…

第9章 shell编程

Linux系统结构 Linux操作系统是一种开放源代码的类UNIX操作系统,它的结构分为内核、Shell和应用程序三个层次。内核层内核是Linux系统的核心部分,它负责管理系统各种硬件设备、文件系统、内存管理和进程管理等核心任务。Linux内核设计了良好的模块化结构,可以动态地加载和卸…

第2章 磁盘与文件管理

磁盘管理 windows和Linux磁盘管理的区别 windows资源管理方式系统一般安装在C盘 C盘下的"Windows"目录是操作系统的核心 C盘下的"Program Files"目录下安装软件 C盘下的"用户"目录是所有的用户,包括超级管理员也在其中 windows操作系统分为C盘、…

第3章 系统命令

系统当前时间 date命令:切换用户 su 用户名sudo 命令:表示使用超级管理员身份执行该命令,如果你当前不是管理员,希望以管理员身份执行某个命令时,使用sudo,需要输入超级管理员的密码: ​​ echo命令 输出字符串 echo "Hello, world!"这将会输出 Hello, world!…

https://avoid.overfit.cn/post/bad10ed894bd43c086e3ef9de7478bea

特征选择作为机器学习工作流程中的关键环节,对模型性能具有决定性影响。Featurewiz是一个功能强大的特征选择库,具备以下核心能力:高度自动化的特征选择,仅需少量代码即可完成。 全面的特征工程功能,不仅能够选择特征,还能生成数百个衍生特征并自动筛选最优特征组合。 实…

Redis复习-五种数据类型

String String是Redis中最常见的数据存储类型: 1.其基本编码方式是RAW,基于简单动态字符串(SDS)实现,存储上限为512mb。 2.如果存储的SDS长度小于44字节,则会采用EMBSTR编码,此时object head与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。 3.如…

cfWGBS揭示与年龄和肌萎缩侧索硬化相关cfDNA甲基化变化及组织/细胞溯源

大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 游离细胞DNA (Cell-free DNA,cfDNA)是血浆中游离的DNA片段,通常来源于正常细胞更新或病理状态下的细胞死亡。cfDNA已被广泛应用于癌症早期检测、胎儿遗传病诊断、器官移植评估等领域。然而,cfDNA在神经退行…

低代码加速智能制造,兰之天的选择是 NocoBase

兰之天借力 NocoBase,破解中小制造企业数字化转型困局,实现智能制造系统开发周期从数月压缩至数周。智能制造的挑战:数字化转型的必然趋势 在全球制造业加速迈向数字化、智能化的背景下,智能制造已成为提升企业竞争力的关键战略。根据财富商业洞察(Fortune Business Insig…

纷享销客CRM全面评测:纷享销客比销售易差异化对比

企业数字化转型热潮中,CRM是众多企业迈向数字化管理的里程碑。近年来,国产CRM在政策推动下成为大中型企业的首选,也有很多企业选择国产CRM替代国外供应商。国产CRM第一梯队中,纷享销客以其卓越的表现脱颖而出,稳坐头把交椅。IDC发布了最新数据报告《IDC China Semiannual …