101 lines
2.4 KiB
Go
101 lines
2.4 KiB
Go
// Copyright (c) 2024 mStar
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package multiplexer
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
type OneToMany[T any] struct {
|
|
inbound chan T
|
|
outbound map[string]chan T // Use map here to give names to outbound channels
|
|
lock sync.Mutex
|
|
closeChan chan any
|
|
closed bool
|
|
}
|
|
|
|
func NewOneToMany[T any]() OneToMany[T] {
|
|
return OneToMany[T]{
|
|
inbound: make(chan T),
|
|
outbound: make(map[string]chan T),
|
|
lock: sync.Mutex{},
|
|
closeChan: make(chan any),
|
|
closed: false,
|
|
}
|
|
}
|
|
|
|
// Get the channel to send things into
|
|
func (o *OneToMany[T]) GetSender() chan T {
|
|
return o.inbound
|
|
}
|
|
|
|
// Create a new receiver for the multiplexer to send messages to.
|
|
// Please do not close this manually, instead use the CloseReceiver func
|
|
func (o *OneToMany[T]) MakeReceiver(name string) (chan T, error) {
|
|
if o.closed {
|
|
return nil, errors.New("multiplexer has been closed")
|
|
}
|
|
rec := make(chan T)
|
|
|
|
// Only allow new receivers to be made
|
|
o.lock.Lock()
|
|
if _, ok := o.outbound[name]; ok {
|
|
return nil, errors.New("receiver with that name already exists")
|
|
}
|
|
o.lock.Unlock()
|
|
|
|
o.outbound[name] = rec
|
|
|
|
return rec, nil
|
|
}
|
|
|
|
// Closes a receiver channel with the given name and removes it from the multiplexer
|
|
func (o *OneToMany[T]) CloseReceiver(name string) {
|
|
if o.closed {
|
|
return
|
|
}
|
|
o.lock.Lock()
|
|
if val, ok := o.outbound[name]; ok {
|
|
close(val)
|
|
delete(o.outbound, name)
|
|
}
|
|
o.lock.Unlock()
|
|
}
|
|
|
|
// Start this one to many multiplexer
|
|
// intended to run as a goroutine (`go plexer.StartPlexer()`)
|
|
func (o *OneToMany[T]) StartPlexer() {
|
|
select {
|
|
// Message gotten from inbound channel
|
|
case msg := <-o.inbound:
|
|
o.lock.Lock()
|
|
// Send it to all outbound channels
|
|
for _, c := range o.outbound {
|
|
c <- msg
|
|
}
|
|
o.lock.Unlock()
|
|
// Told to close the plexer including sender
|
|
case <-o.closeChan:
|
|
o.lock.Lock()
|
|
// First close all outbound channels
|
|
// No need to send any signal there as readers will just stop
|
|
for _, c := range o.outbound {
|
|
close(c)
|
|
}
|
|
// Then close inbound, set closed and call it a day
|
|
close(o.inbound)
|
|
o.closed = true
|
|
o.lock.Unlock()
|
|
return
|
|
}
|
|
}
|
|
|
|
// Close the sender and all receiver channels, mark the plexer as closed and stop the distribution goroutine (all by sending one signal)
|
|
func (o *OneToMany[T]) CloseSender() {
|
|
o.closeChan <- 1
|
|
}
|