goutils/multiplexer/one-to-many.go

102 lines
2.4 KiB
Go
Raw Normal View History

2024-02-11 10:35:59 +00:00
// Copyright (c) 2024 mStar
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package multiplexer
import (
"errors"
"sync"
)
type OneToMany[T any] struct {
inbound chan T
outbound map[string]chan T // Use map here to give names to outbound channels
lock sync.Mutex
closeChan chan any
closed bool
}
func NewOneToMany[T any]() OneToMany[T] {
return OneToMany[T]{
inbound: make(chan T),
outbound: make(map[string]chan T),
lock: sync.Mutex{},
closeChan: make(chan any),
closed: false,
}
}
// Get the channel to send things into
func (o *OneToMany[T]) GetSender() chan T {
return o.inbound
}
// Create a new receiver for the multiplexer to send messages to.
// Please do not close this manually, instead use the CloseReceiver func
func (o *OneToMany[T]) MakeReceiver(name string) (chan T, error) {
if o.closed {
return nil, errors.New("multiplexer has been closed")
}
rec := make(chan T)
// Only allow new receivers to be made
o.lock.Lock()
if _, ok := o.outbound[name]; ok {
return nil, errors.New("receiver with that name already exists")
}
o.lock.Unlock()
o.outbound[name] = rec
return rec, nil
}
// Closes a receiver channel with the given name and removes it from the multiplexer
func (o *OneToMany[T]) CloseReceiver(name string) {
if o.closed {
return
}
o.lock.Lock()
if val, ok := o.outbound[name]; ok {
close(val)
delete(o.outbound, name)
}
o.lock.Unlock()
}
// Start this one to many multiplexer
// intended to run as a goroutine (`go plexer.StartPlexer()`)
func (o *OneToMany[T]) StartPlexer() {
select {
// Message gotten from inbound channel
case msg := <-o.inbound:
o.lock.Lock()
// Send it to all outbound channels
for _, c := range o.outbound {
c <- msg
}
o.lock.Unlock()
// Told to close the plexer including sender
case <-o.closeChan:
o.lock.Lock()
// First close all outbound channels
// No need to send any signal there as readers will just stop
for _, c := range o.outbound {
close(c)
}
// Then close inbound, set closed and call it a day
close(o.inbound)
o.closed = true
o.lock.Unlock()
return
}
}
// Close the sender and all receiver channels, mark the plexer as closed and stop the distribution goroutine (all by sending one signal)
func (o *OneToMany[T]) CloseSender() {
o.closeChan <- 1
}