💬 Что из себя представляет паттерн Fan-In (Мультиплексор), применяемый в облачной разработке?
Паттерн Fan-In мультиплексирует несколько входных каналов в один выходной канал.
В сервисах, имеющих несколько рабочих процессов, генерирующих результаты, бывает полезно объединить эти результаты в один общий поток. В таких случаях используется шаблон Мультиплексор, позволяющий читать данные из нескольких входных каналов и передавать их в один целевой канал.
📌 Компоненты:
◆
◆
◆
📌 Реализация:
Функция
Для каждого входного канала Funnel запускает специальную горутину, которая читает значения из назначенного ей канала и пересылает их в
📌 Использование функции
Паттерн Fan-In мультиплексирует несколько входных каналов в один выходной канал.
В сервисах, имеющих несколько рабочих процессов, генерирующих результаты, бывает полезно объединить эти результаты в один общий поток. В таких случаях используется шаблон Мультиплексор, позволяющий читать данные из нескольких входных каналов и передавать их в один целевой канал.
📌 Компоненты:
◆
Sources
— набор из одного или нескольких входных каналов одного типа, которые может принять Funnel
.◆
Destination
— выходной канал того же типа, что и каналы в наборе Sources
. Создается функцией Funnel
.◆
Funnel
— принимает набор Sources
и сразу же возвращает Destination
. Любые данные, поступающие из каналов в Sources
, передаются в Destination
.📌 Реализация:
Funnel
— функция, которая принимает до N
входных каналов (Sources
). Для каждого входного канала в Sources
функция Funnel
запускает отдельную горутину, читающую значения из назначенного ей канала и передающую их в выходной канал, общий для всех горутин (Destination
).Функция
Funnel
принимает переменное число аргументов — ноль или более каналов некоторого типа (int
в нашем примере): func Funnel(sources ...<-chan int) <-chan int {
dest := make(chan int) // Общий выходной канал
var wg sync.WaitGroup // Для автоматического закрытия dest, когда закроются все входящие каналы sources
wg.Add(len(sources)) // Установить размер WaitGroup
for _, ch := range sources { // Запуск горутины для каждого входного канала
go func(c <-chan int) {
defer wg.Done() // Уведомить WaitGroup, когда c закроется
for n := range c {
dest <- n
}
}(ch)
}
go func() { // Запустить горутину, которая закроет dest после закрытия всех входных каналов
wg.Wait()
close(dest)
}()
return dest
}
Для каждого входного канала Funnel запускает специальную горутину, которая читает значения из назначенного ей канала и пересылает их в
dest
, общий выходной канал для всех горутин.📌 Использование функции
Funnel
:func main() {
sources := make([]<-chan int, 0) // Создать пустой срез с каналами
for i := 0; i < 3; i++ {
ch := make(chan int)
sources = append(sources, ch) // Создать канал; добавить в срез sources
go func() { // Запустить горутину для каждого
defer close(ch) // Закрыть канал по завершении горутины
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(time.Second) }
}()
}
dest := Funnel(sources...)
for d := range dest {
fmt.Println(d)
}
}