diff --git a/multiplexer/many-to-one.go b/multiplexer/many-to-one.go new file mode 100644 index 0000000..6ea931a --- /dev/null +++ b/multiplexer/many-to-one.go @@ -0,0 +1,43 @@ +// Copyright (c) 2024 mStar +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package multiplexer + +import "errors" + +// A many to one multiplexer +// Yes, channels technically already are that, but there are a bunch of problems with using raw channels as multiplexer: +// If any of the senders tries to send to a closed channel, it explodes +// Thus, wrap it inside a struct that handles that case of a closed channel +type ManyToOne[T any] struct { + outbound chan T + closed bool +} + +// NewManyToOne creates a new ManyToOne multiplexer +// The given channel will be where all messages will be sent to +func NewManyToOne[T any](receiver chan T) ManyToOne[T] { + return ManyToOne[T]{ + outbound: receiver, + closed: false, + } +} + +// Send a message to this many to one plexer +// If closed, the message won't get sent +func (m *ManyToOne[T]) Send(msg T) error { + if m.closed { + return errors.New("multiplexer has been closed") + } + m.outbound <- msg + return nil +} + +// Closes the channel and marks the plexer as closed +func (m *ManyToOne[T]) Close() { + close(m.outbound) + m.closed = true +} diff --git a/multiplexer/one-to-many.go b/multiplexer/one-to-many.go new file mode 100644 index 0000000..95dff4c --- /dev/null +++ b/multiplexer/one-to-many.go @@ -0,0 +1,101 @@ +// Copyright (c) 2024 mStar +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package multiplexer + +import ( + "errors" + "sync" +) + +type OneToMany[T any] struct { + inbound chan T + outbound map[string]chan T // Use map here to give names to outbound channels + lock sync.Mutex + closeChan chan any + closed bool +} + +func NewOneToMany[T any]() OneToMany[T] { + return OneToMany[T]{ + inbound: make(chan T), + outbound: make(map[string]chan T), + lock: sync.Mutex{}, + closeChan: make(chan any), + closed: false, + } +} + +// Get the channel to send things into +func (o *OneToMany[T]) GetSender() chan T { + return o.inbound +} + +// Create a new receiver for the multiplexer to send messages to. +// Please do not close this manually, instead use the CloseReceiver func +func (o *OneToMany[T]) MakeReceiver(name string) (chan T, error) { + if o.closed { + return nil, errors.New("multiplexer has been closed") + } + rec := make(chan T) + + // Only allow new receivers to be made + o.lock.Lock() + if _, ok := o.outbound[name]; ok { + return nil, errors.New("receiver with that name already exists") + } + o.lock.Unlock() + + o.outbound[name] = rec + + return rec, nil +} + +// Closes a receiver channel with the given name and removes it from the multiplexer +func (o *OneToMany[T]) CloseReceiver(name string) { + if o.closed { + return + } + o.lock.Lock() + if val, ok := o.outbound[name]; ok { + close(val) + delete(o.outbound, name) + } + o.lock.Unlock() +} + +// Start this one to many multiplexer +// intended to run as a goroutine (`go plexer.StartPlexer()`) +func (o *OneToMany[T]) StartPlexer() { + select { + // Message gotten from inbound channel + case msg := <-o.inbound: + o.lock.Lock() + // Send it to all outbound channels + for _, c := range o.outbound { + c <- msg + } + o.lock.Unlock() + // Told to close the plexer including sender + case <-o.closeChan: + o.lock.Lock() + // First close all outbound channels + // No need to send any signal there as readers will just stop + for _, c := range o.outbound { + close(c) + } + // Then close inbound, set closed and call it a day + close(o.inbound) + o.closed = true + o.lock.Unlock() + return + } +} + +// Close the sender and all receiver channels, mark the plexer as closed and stop the distribution goroutine (all by sending one signal) +func (o *OneToMany[T]) CloseSender() { + o.closeChan <- 1 +}