💬 Что из себя представляет паттерн Fan-In (Мультиплексор), применяемый в облачной разработке?



Паттерн Fan-In мультиплексирует несколько входных каналов в один выходной канал.



В сервисах, имеющих несколько рабочих процессов, генерирующих результаты, бывает полезно объединить эти результаты в один общий поток. В таких случаях используется шаблон Мультиплексор, позволяющий читать данные из нескольких входных каналов и передавать их в один целевой канал.



📌 Компоненты:

Sources — набор из одного или нескольких входных каналов одного типа, которые может принять Funnel.

Destination — выходной канал того же типа, что и каналы в наборе Sources. Создается функцией Funnel.

Funnel — принимает набор Sources и сразу же возвращает Destination. Любые данные, поступающие из каналов в Sources, передаются в Destination.



📌 Реализация:

Funnel — функция, которая принимает до N входных каналов (Sources). Для каждого входного канала в Sources функция Funnel запускает отдельную горутину, читающую значения из назначенного ей канала и передающую их в выходной канал, общий для всех горутин (Destination).



Функция Funnel принимает переменное число аргументов — ноль или более каналов некоторого типа (int в нашем примере):



func Funnel(sources ...<-chan int) <-chan int {

dest := make(chan int) // Общий выходной канал



var wg sync.WaitGroup // Для автоматического закрытия dest, когда закроются все входящие каналы sources



wg.Add(len(sources)) // Установить размер WaitGroup



for _, ch := range sources { // Запуск горутины для каждого входного канала

go func(c <-chan int) {

defer wg.Done() // Уведомить WaitGroup, когда c закроется



for n := range c {

dest <- n

}

}(ch)

}



go func() { // Запустить горутину, которая закроет dest после закрытия всех входных каналов

wg.Wait()

close(dest)

}()



return dest

}





Для каждого входного канала Funnel запускает специальную горутину, которая читает значения из назначенного ей канала и пересылает их в dest, общий выходной канал для всех горутин.



📌 Использование функции Funnel:



func main() {

sources := make([]<-chan int, 0) // Создать пустой срез с каналами



for i := 0; i < 3; i++ {

ch := make(chan int)

sources = append(sources, ch) // Создать канал; добавить в срез sources

go func() { // Запустить горутину для каждого

defer close(ch) // Закрыть канал по завершении горутины

for i := 1; i <= 5; i++ {

ch <- i

time.Sleep(time.Second) }

}()

}



dest := Funnel(sources...)

for d := range dest {

fmt.Println(d)

}

}