cio.WithStdio 即为opt 因为 func NewAttach(opts ...Opt)
task, err := container.Task(ctx, cio.NewAttach(cio.WithStdio))
if err != nil {
return err
}
1. cio.WithStdio 实现 即attch中的opt实现细节。
type Opt func(*Streams)
// WithStdio sets stream options to the standard input/output streams
func WithStdio(opt *Streams) {
WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
}
//把标准输入 输出 标准错误放到stream里 返回opt 这里opt 函数暂时未赋值
// WithStreams sets the stream options to the specified Reader and Writers
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
return func(opt *Streams) {
opt.Stdin = stdin
opt.Stdout = stdout
opt.Stderr = stderr
}
}
2. NewAttach 实现
type Attach func(*FIFOSet) (IO, error)
// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
func NewAttach(opts ...Opt) Attach {
streams := &Streams{}
//执行opt函数
for _, opt := range opts {
// 等价于
// strean.Stdin = os.stdin
// strean.Stdout = os.stdout
// strean.Stderr = os.stderr
opt(streams)
}
return func(fifos *FIFOSet) (IO, error) {
if fifos == nil {
return nil, fmt.Errorf("cannot attach, missing fifos")
}
if streams.Stdin == nil {
fifos.Stdin = ""
}
if streams.Stdout == nil {
fifos.Stdout = ""
}
if streams.Stderr == nil {
fifos.Stderr = ""
}
//copy stream(当前的标准输入 输出 )流到 fifos
return copyIO(fifos, streams)
}
}
3. containerd Task实现
func (c *container) Task(ctx context.Context, attach cio.Attach) (Task, error) {
return c.loadTask(ctx, attach)
}
func (c *container) loadTask(ctx context.Context, ioAttach cio.Attach) (Task, error) {
response, err := c.client.TaskService().Get(ctx, &tasks.GetRequest{
ContainerID: c.id,
})
if err != nil {
err = errdefs.FromGRPC(err)
if errdefs.IsNotFound(err) {
return nil, fmt.Errorf("no running task found: %w", err)
}
return nil, err
}
var i cio.IO
if ioAttach != nil && response.Process.Status != tasktypes.Status_UNKNOWN {
// Do not attach IO for task in unknown state, because there
// are no fifo paths anyway.
if i, err = attachExistingIO(response, ioAttach); err != nil {
return nil, err
}
}
t := &task{
client: c.client,
io: i,
id: response.Process.ID,
pid: response.Process.Pid,
c: c,
}
return t, nil
}
其中attachExistingIO 实现
ioAttach 就是cio.NewAttach(cio.WithStdio)
// get the existing fifo paths from the task information stored by the daemon
func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) {
//获取容器process 的fifo
fifoSet := loadFifos(response)
//执行NewAttach 包含iocopy
return ioAttach(fifoSet)
}
// loadFifos loads the containers fifos
func loadFifos(response *tasks.GetResponse) *cio.FIFOSet {
fifos := []string{
response.Process.Stdin,
response.Process.Stdout,
response.Process.Stderr,
}
closer := func() error {
var (
err error
dirs = map[string]struct{}{}
)
for _, f := range fifos {
if isFifo, _ := fifo.IsFifo(f); isFifo {
if rerr := os.Remove(f); err == nil {
err = rerr
}
dirs[filepath.Dir(f)] = struct{}{}
}
}
for dir := range dirs {
// we ignore errors here because we don't
// want to remove the directory if it isn't
// empty
os.Remove(dir)
}
return err
}
return cio.NewFIFOSet(cio.Config{
Stdin: response.Process.Stdin,
Stdout: response.Process.Stdout,
Stderr: response.Process.Stderr,
Terminal: response.Process.Terminal,
}, closer)
}
container.Task的最后流程是在进行copyIO
实现如下
func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
var ctx, cancel = context.WithCancel(context.Background())
pipes, err := openFifos(ctx, fifos)
if err != nil {
cancel()
return nil, err
}
if fifos.Stdin != "" {
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
//把 控制台的标准输入扔到到容器的标准输入 io.CopyBuffer(pipes.Stdin, ioset.Stdin, *p)
pipes.Stdin.Close()
}()
}
var wg = &sync.WaitGroup{}
if fifos.Stdout != "" {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
//容器的pipe的标准输出发送到 控制台的标准输出
io.CopyBuffer(ioset.Stdout, pipes.Stdout, *p)
pipes.Stdout.Close()
wg.Done()
}()
}
if !fifos.Terminal && fifos.Stderr != "" {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
//容器的pipe的标准错误发送到 控制台的标准错误
io.CopyBuffer(ioset.Stderr, pipes.Stderr, *p)
pipes.Stderr.Close()
wg.Done()
}()
}
return &cio{
config: fifos.Config,
wg: wg,
closers: append(pipes.closers(), fifos),
cancel: func() {
cancel()
for _, c := range pipes.closers() {
if c != nil {
c.Close()
}
}
},
}, nil
}