
是的,它已经长满了。 我相信,一个关心郊区尊严的正直公民,比如你自己,会同意我妻子的观点,即我“确实应该为此做点什么”。 这周我有很多时间,所以很自然地,我利用这段时间忽略了花坛并编写了一个马赛克生成器。 这样,我就可以给你这个我不那么可爱的花坛的可爱马赛克:

该图像包含由 894 个源图像组成的 14,490 个图块,这需要大量处理。 幸运的是,其中大部分工作可以同时完成。 事实上,这个程序的核心是两个非常标准的处理管道,它们是如此普通,以至于我用它们作为借口来谈论 Go 并发模式的主力:扇出、扇入。 它用途广泛,几乎可以用于任何并发处理。
在我们开始之前,我需要从高层次上解释一下马赛克(mosaic )。 它有两个阶段:索引和交换。 索引为交换阶段构建平铺图像列表。 交换阶段为输出图像中的每个图块寻找替换。 索引必须在交换开始之前完成,但每个阶段可以同时进行。 它看起来像这样:

生成器
并发构建块的Go 代码的往往看起来像这样:
func counter() <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < 100; i++ {
out <- i
}
}()
return out
}
该函数立即返回,并向返回的通道发出一个值流。 它们让我想起了 Python 中的生成器,所以我就这样称呼它们。 如果遵循两个规则,Go 中的并发性会容易得多:1)每个通道应该只有一个写入器,2)该写入器必须关闭通道。 该函数返回一个只读通道,防止任何其他写入。 并且 defer 会自动关闭通道。
Go 继续从关闭的通道发出值,直到所有内容都被消耗掉,并且 Go 的 for .. range 循环在通道结束时自动中断。 因此约定是像这样使用通道中的值:
for value := range count() {
// do something with value
}
在马赛克(mosaic)的索引器中,管道中的第一个生成器发出图像文件名:
func (idx *Index) findImages(ctx context.Context, path string) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
if err != nil {
log.Printf("%s: %s", path, err)
return nil
}
ext := strings.ToLower(filepath.Ext(path))
switch ext {
case ".jpg", ".jpeg", ".png", ".gif":
default:
return nil
}
select {
case ch <- path:
case <-ctx.Done():
return fs.SkipAll
}
return nil
})
if err != nil {
log.Fatalf("error walking path %s: %s", path, err)
}
}()
return ch
}
该模式中唯一的新问题是上下文,我稍后会讨论它。 这个函数也非常类似于交换管道的开始:tileize。
扇出
生成器本质上并不是并发的:读取器等待输入,写入器等待读取器。 我们可以通过向输出通道添加缓冲区来使其并发。 这可能适合你正在构建的任何东西,但它对我没有帮助。 我需要的是一个可以同时运行并分析图像的函数:
func (idx *Index) worker(ch <-chan string) <-chan imageColor {
out := make(chan imageColor)
go func() {
defer close(out)
for path := range ch {
img, err := loadImage(path)
if err != nil {
log.Printf("%s: %s", path, err)
continue
}
out <- imageColor{
Path: path,
Color: primaryColor(img, 0.01),
}
}
}()
return out
}
这是另一个生成器函数。 我们可以运行它的许多不同副本,并为每个副本提供相同的输入通道(这是上一步的输出):
colorChs := make([]<-chan imageColor, numberOfWorkers)
for i := range colorChs {
colorChs[i] = idx.worker(pathCh)
}
这就是“扇出、扇入”的“扇出”。 可能需要进行一些实验才能找到合适的 numberOfWorkers 值。 runtime.NumCPU() 是一个很好的初步猜测,特别是对于像这样的 CPU 密集型任务。
findImages 仍然会阻塞,但仅限于其中一个工作人员准备好获取另一个值。 如果输入通道为空或输出通道已满,工作人员也会阻塞。 目标是防止这些事情发生,以便数据流动和工作同时进行。
扇入
一片通道使用起来很麻烦。 您可能认为 select 会有所帮助,但这需要在代码中显式命名通道。 您可能也想尝试这样的事情:
for i := range sliceOChannels {
for value := range sliceOChannels[i] {
// Do something with value
}
}
但不要这样做。 在最好的情况下,您的通道已缓冲并且有足够的容量用于所有输出,因此您会浪费内存。 在最坏的情况下,您的程序会在处理来自工作人员 i 的值时阻塞工作人员 i+1..n。 这不是并发。 这是复杂的串行代码。
索引器需要将它们返回到单个输出流,因为它的“索引”是一个切片,不能同时修改(互斥体也可以工作,但它不太优雅并且仍然不是并发的)。 此通道合并是“扇入”:
func mergeColorChannels(chs ...<-chan imageColor) <-chan imageColor {
out := make(chan imageColor)
var wg sync.WaitGroup
wg.Add(len(chs))
for _, ch := range chs {
go func(ch <-chan imageColor) {
defer wg.Done()
for img := range ch {
out <- img
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
希望您认识生成器函数。 这个需要任意数量的输入通道,并且不是启动一个 goroutine,而是有 n+1 个 goroutine。 还记得我们的规则:每个频道应该有一名作家吗? 我们在这里通过让多个 goroutine 写入同一个输出通道来捏造它,但我们可以说我们的合并函数仍然是唯一的写入器。
WaitGroup 确保我们遵循第二条规则:作者必须关闭通道。 是的,这是乏味的代码。 但如果我们不关闭通道,消费者就不知道何时终止,程序就会挂起。
制作这么多 goroutine 似乎很浪费。 如果它们是线程或者进程(但愿不会),那么你是对的。 但 goroutine 很便宜,而且它们大多会阻塞等待输入。
索引管道的最后一步是一次读取一个值:
for found := range mergeColorChannels(colorChs...) {
idx.insert(found.Color, found.Path)
}
这具有防止我们的主索引函数返回的额外效果,直到所有内容都已处理完毕。
顺便说一句,合并是该算法是并发而不是并行的原因。 每个worker都可以独立工作,但最终必须等待轮到插入(insert)。 马赛克(mosaic )的交换阶段会跳过合并,因为每个工作线程都可以直接写入输出图像中的图块,因此它可能是并行的,具体取决于可用的 CPU 核心和调度。
并发是两排顾客从同一个收银员处点餐(排轮流点餐); 并行性是两排顾客从两个收银员处订购(每排都有自己的收银员)。
– 中国古代谚语(或者是 Stack Overflow 上的 chharvey?)
闭幕式
第一个生成器完成其工作并关闭其输出通道,这向下一步的工作人员发出信号以完成工作。 这将继续下去,直到最终的输出通道完成。 因此,最终,整个管道非常优雅地折叠起来。 您可能还记得我们管道中的第一个生成器采用了上下文:
func (idx *Index) findImages(ctx context.Context, path string) <-chan string {
ch := make(chan string)
go func() {
// ...
select {
case ch <- path:
case <-ctx.Done():
// Abort
return fs.SkipAll
}
// ...
}()
return ch
}
您可能已经注意到,其他生成器都没有获取上下文。 因为每件作品都足够小,不妨完成它。 要取消,我们要做的就是停止头部的输入流,并让管道正常关闭。
在马赛克(mosaic)中,上下文由中断处理程序关闭:
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
结果是,在交换阶段使用 Ctrl-C 中断马赛克(mosaic)将导致它完成正在进行的任何操作并写入部分完成的图像。 这些管道的结束方式是 Go 并发性的一个美丽特性,也是我最喜欢该语言的地方之一。