Go 并发:扇入扇出

0 评论
/ /
502 阅读
/
6660 字
27 2023-07
是的,它已经长满了。 我相信,一个关心郊区尊严的正直公民,比如你自己,会同意我妻子的观点,即我“确实应该为此做点什么”。 这周我有很多时间,所以很自然地,我利用这段时间忽略了花坛并编写了一个马赛克生成器。 这样,我就可以给你这个我不那么可爱的花坛的可爱马赛克:
 
该图像包含由 894 个源图像组成的 14,490 个图块,这需要大量处理。 幸运的是,其中大部分工作可以同时完成。 事实上,这个程序的核心是两个非常标准的处理管道,它们是如此普通,以至于我用它们作为借口来谈论 Go 并发模式的主力:扇出、扇入。 它用途广泛,几乎可以用于任何并发处理。
 
在我们开始之前,我需要从高层次上解释一下马赛克(mosaic )。 它有两个阶段:索引和交换。 索引为交换阶段构建平铺图像列表。 交换阶段为输出图像中的每个图块寻找替换。 索引必须在交换开始之前完成,但每个阶段可以同时进行。 它看起来像这样:
 
生成器
并发构建块的Go 代码的往往看起来像这样:
 
func counter() <-chan int {
	out := make(chan int)

	go func() {
		defer close(out)

		for i := 0; i < 100; i++ {
			out <- i
		}
	}()

	return out
}
 
该函数立即返回,并向返回的通道发出一个值流。 它们让我想起了 Python 中的生成器,所以我就这样称呼它们。 如果遵循两个规则,Go 中的并发性会容易得多:1)每个通道应该只有一个写入器,2)该写入器必须关闭通道。 该函数返回一个只读通道,防止任何其他写入。 并且 defer 会自动关闭通道。
 
Go 继续从关闭的通道发出值,直到所有内容都被消耗掉,并且 Go 的 for .. range 循环在通道结束时自动中断。 因此约定是像这样使用通道中的值:
 
for value := range count() {
    // do something with value
}
 
在马赛克(mosaic)的索引器中,管道中的第一个生成器发出图像文件名:
func (idx *Index) findImages(ctx context.Context, path string) <-chan string {
	ch := make(chan string)
	go func() {
		defer close(ch)

		err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
			if err != nil {
				log.Printf("%s: %s", path, err)
				return nil
			}

			ext := strings.ToLower(filepath.Ext(path))
			switch ext {
			case ".jpg", ".jpeg", ".png", ".gif":
			default:
				return nil
			}

			select {
			case ch <- path:
			case <-ctx.Done():
				return fs.SkipAll
			}

			return nil
		})
		if err != nil {
			log.Fatalf("error walking path %s: %s", path, err)
		}
	}()
	return ch
}
 
该模式中唯一的新问题是上下文,我稍后会讨论它。 这个函数也非常类似于交换管道的开始:tileize。
 
扇出
生成器本质上并不是并发的:读取器等待输入,写入器等待读取器。 我们可以通过向输出通道添加缓冲区来使其并发。 这可能适合你正在构建的任何东西,但它对我没有帮助。 我需要的是一个可以同时运行并分析图像的函数:
 
func (idx *Index) worker(ch <-chan string) <-chan imageColor {
    out := make(chan imageColor)
    go func() {
        defer close(out)
        for path := range ch {
            img, err := loadImage(path)
            if err != nil {
                log.Printf("%s: %s", path, err)
                continue
            }

            out <- imageColor{
                Path:  path,
                Color: primaryColor(img, 0.01),
            }
       }
    }()

    return out
}
 
这是另一个生成器函数。 我们可以运行它的许多不同副本,并为每个副本提供相同的输入通道(这是上一步的输出):
 
colorChs := make([]<-chan imageColor, numberOfWorkers)
for i := range colorChs {
    colorChs[i] = idx.worker(pathCh)
}
 
 
这就是“扇出、扇入”的“扇出”。 可能需要进行一些实验才能找到合适的 numberOfWorkers 值。 runtime.NumCPU() 是一个很好的初步猜测,特别是对于像这样的 CPU 密集型任务。
 
findImages 仍然会阻塞,但仅限于其中一个工作人员准备好获取另一个值。 如果输入通道为空或输出通道已满,工作人员也会阻塞。 目标是防止这些事情发生,以便数据流动和工作同时进行。
 
扇入
一片通道使用起来很麻烦。 您可能认为 select 会有所帮助,但这需要在代码中显式命名通道。 您可能也想尝试这样的事情:
 
for i := range sliceOChannels {
    for value := range sliceOChannels[i] {
    // Do something with value
    }
}
 
但不要这样做。 在最好的情况下,您的通道已缓冲并且有足够的容量用于所有输出,因此您会浪费内存。 在最坏的情况下,您的程序会在处理来自工作人员 i 的值时阻塞工作人员 i+1..n。 这不是并发。 这是复杂的串行代码。
 
索引器需要将它们返回到单个输出流,因为它的“索引”是一个切片,不能同时修改(互斥体也可以工作,但它不太优雅并且仍然不是并发的)。 此通道合并是“扇入”:
 
func mergeColorChannels(chs ...<-chan imageColor) <-chan imageColor {
	out := make(chan imageColor)

	var wg sync.WaitGroup
	wg.Add(len(chs))
	for _, ch := range chs {
		go func(ch <-chan imageColor) {
			defer wg.Done()
			for img := range ch {
				out <- img
			}
		}(ch)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}
 
希望您认识生成器函数。 这个需要任意数量的输入通道,并且不是启动一个 goroutine,而是有 n+1 个 goroutine。 还记得我们的规则:每个频道应该有一名作家吗? 我们在这里通过让多个 goroutine 写入同一个输出通道来捏造它,但我们可以说我们的合并函数仍然是唯一的写入器。
 
WaitGroup 确保我们遵循第二条规则:作者必须关闭通道。 是的,这是乏味的代码。 但如果我们不关闭通道,消费者就不知道何时终止,程序就会挂起。
 
制作这么多 goroutine 似乎很浪费。 如果它们是线程或者进程(但愿不会),那么你是对的。 但 goroutine 很便宜,而且它们大多会阻塞等待输入。
 
索引管道的最后一步是一次读取一个值:
 
for found := range mergeColorChannels(colorChs...) {
    idx.insert(found.Color, found.Path)
}
 
这具有防止我们的主索引函数返回的额外效果,直到所有内容都已处理完毕。
 
顺便说一句,合并是该算法是并发而不是并行的原因。 每个worker都可以独立工作,但最终必须等待轮到插入(insert)。 马赛克(mosaic )的交换阶段会跳过合并,因为每个工作线程都可以直接写入输出图像中的图块,因此它可能是并行的,具体取决于可用的 CPU 核心和调度。
 
并发是两排顾客从同一个收银员处点餐(排轮流点餐); 并行性是两排顾客从两个收银员处订购(每排都有自己的收银员)。
 
– 中国古代谚语(或者是 Stack Overflow 上的 chharvey?)
 
 
闭幕式
第一个生成器完成其工作并关闭其输出通道,这向下一步的工作人员发出信号以完成工作。 这将继续下去,直到最终的输出通道完成。 因此,最终,整个管道非常优雅地折叠起来。 您可能还记得我们管道中的第一个生成器采用了上下文:
func (idx *Index) findImages(ctx context.Context, path string) <-chan string {
    ch := make(chan string)
    go func() {
         // ...
        select {
            case ch <- path:
            case <-ctx.Done():
            // Abort
            return fs.SkipAll
        }
        // ...
    }()
    return ch
}
 
您可能已经注意到,其他生成器都没有获取上下文。 因为每件作品都足够小,不妨完成它。 要取消,我们要做的就是停止头部的输入流,并让管道正常关闭。
 
在马赛克(mosaic)中,上下文由中断处理程序关闭:
 
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
 
结果是,在交换阶段使用 Ctrl-C 中断马赛克(mosaic)将导致它完成正在进行的任何操作并写入部分完成的图像。 这些管道的结束方式是 Go 并发性的一个美丽特性,也是我最喜欢该语言的地方之一。