Add multiplexer

This commit is contained in:
mStar aka a person 2024-02-11 11:35:59 +01:00
parent 7f9be06e1b
commit a969a91746
2 changed files with 144 additions and 0 deletions

View file

@ -0,0 +1,43 @@
// Copyright (c) 2024 mStar
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package multiplexer
import "errors"
// A many to one multiplexer
// Yes, channels technically already are that, but there are a bunch of problems with using raw channels as multiplexer:
// If any of the senders tries to send to a closed channel, it explodes
// Thus, wrap it inside a struct that handles that case of a closed channel
type ManyToOne[T any] struct {
outbound chan T
closed bool
}
// NewManyToOne creates a new ManyToOne multiplexer
// The given channel will be where all messages will be sent to
func NewManyToOne[T any](receiver chan T) ManyToOne[T] {
return ManyToOne[T]{
outbound: receiver,
closed: false,
}
}
// Send a message to this many to one plexer
// If closed, the message won't get sent
func (m *ManyToOne[T]) Send(msg T) error {
if m.closed {
return errors.New("multiplexer has been closed")
}
m.outbound <- msg
return nil
}
// Closes the channel and marks the plexer as closed
func (m *ManyToOne[T]) Close() {
close(m.outbound)
m.closed = true
}

101
multiplexer/one-to-many.go Normal file
View file

@ -0,0 +1,101 @@
// Copyright (c) 2024 mStar
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package multiplexer
import (
"errors"
"sync"
)
type OneToMany[T any] struct {
inbound chan T
outbound map[string]chan T // Use map here to give names to outbound channels
lock sync.Mutex
closeChan chan any
closed bool
}
func NewOneToMany[T any]() OneToMany[T] {
return OneToMany[T]{
inbound: make(chan T),
outbound: make(map[string]chan T),
lock: sync.Mutex{},
closeChan: make(chan any),
closed: false,
}
}
// Get the channel to send things into
func (o *OneToMany[T]) GetSender() chan T {
return o.inbound
}
// Create a new receiver for the multiplexer to send messages to.
// Please do not close this manually, instead use the CloseReceiver func
func (o *OneToMany[T]) MakeReceiver(name string) (chan T, error) {
if o.closed {
return nil, errors.New("multiplexer has been closed")
}
rec := make(chan T)
// Only allow new receivers to be made
o.lock.Lock()
if _, ok := o.outbound[name]; ok {
return nil, errors.New("receiver with that name already exists")
}
o.lock.Unlock()
o.outbound[name] = rec
return rec, nil
}
// Closes a receiver channel with the given name and removes it from the multiplexer
func (o *OneToMany[T]) CloseReceiver(name string) {
if o.closed {
return
}
o.lock.Lock()
if val, ok := o.outbound[name]; ok {
close(val)
delete(o.outbound, name)
}
o.lock.Unlock()
}
// Start this one to many multiplexer
// intended to run as a goroutine (`go plexer.StartPlexer()`)
func (o *OneToMany[T]) StartPlexer() {
select {
// Message gotten from inbound channel
case msg := <-o.inbound:
o.lock.Lock()
// Send it to all outbound channels
for _, c := range o.outbound {
c <- msg
}
o.lock.Unlock()
// Told to close the plexer including sender
case <-o.closeChan:
o.lock.Lock()
// First close all outbound channels
// No need to send any signal there as readers will just stop
for _, c := range o.outbound {
close(c)
}
// Then close inbound, set closed and call it a day
close(o.inbound)
o.closed = true
o.lock.Unlock()
return
}
}
// Close the sender and all receiver channels, mark the plexer as closed and stop the distribution goroutine (all by sending one signal)
func (o *OneToMany[T]) CloseSender() {
o.closeChan <- 1
}