diff --git a/multiplexer/many-to-one.go b/multiplexer/many-to-one.go deleted file mode 100644 index 6ea931a..0000000 --- a/multiplexer/many-to-one.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2024 mStar -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package multiplexer - -import "errors" - -// A many to one multiplexer -// Yes, channels technically already are that, but there are a bunch of problems with using raw channels as multiplexer: -// If any of the senders tries to send to a closed channel, it explodes -// Thus, wrap it inside a struct that handles that case of a closed channel -type ManyToOne[T any] struct { - outbound chan T - closed bool -} - -// NewManyToOne creates a new ManyToOne multiplexer -// The given channel will be where all messages will be sent to -func NewManyToOne[T any](receiver chan T) ManyToOne[T] { - return ManyToOne[T]{ - outbound: receiver, - closed: false, - } -} - -// Send a message to this many to one plexer -// If closed, the message won't get sent -func (m *ManyToOne[T]) Send(msg T) error { - if m.closed { - return errors.New("multiplexer has been closed") - } - m.outbound <- msg - return nil -} - -// Closes the channel and marks the plexer as closed -func (m *ManyToOne[T]) Close() { - close(m.outbound) - m.closed = true -} diff --git a/multiplexer/one-to-many.go b/multiplexer/one-to-many.go deleted file mode 100644 index 95dff4c..0000000 --- a/multiplexer/one-to-many.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (c) 2024 mStar -// -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package multiplexer - -import ( - "errors" - "sync" -) - -type OneToMany[T any] struct { - inbound chan T - outbound map[string]chan T // Use map here to give names to outbound channels - lock sync.Mutex - closeChan chan any - closed bool -} - -func NewOneToMany[T any]() OneToMany[T] { - return OneToMany[T]{ - inbound: make(chan T), - outbound: make(map[string]chan T), - lock: sync.Mutex{}, - closeChan: make(chan any), - closed: false, - } -} - -// Get the channel to send things into -func (o *OneToMany[T]) GetSender() chan T { - return o.inbound -} - -// Create a new receiver for the multiplexer to send messages to. -// Please do not close this manually, instead use the CloseReceiver func -func (o *OneToMany[T]) MakeReceiver(name string) (chan T, error) { - if o.closed { - return nil, errors.New("multiplexer has been closed") - } - rec := make(chan T) - - // Only allow new receivers to be made - o.lock.Lock() - if _, ok := o.outbound[name]; ok { - return nil, errors.New("receiver with that name already exists") - } - o.lock.Unlock() - - o.outbound[name] = rec - - return rec, nil -} - -// Closes a receiver channel with the given name and removes it from the multiplexer -func (o *OneToMany[T]) CloseReceiver(name string) { - if o.closed { - return - } - o.lock.Lock() - if val, ok := o.outbound[name]; ok { - close(val) - delete(o.outbound, name) - } - o.lock.Unlock() -} - -// Start this one to many multiplexer -// intended to run as a goroutine (`go plexer.StartPlexer()`) -func (o *OneToMany[T]) StartPlexer() { - select { - // Message gotten from inbound channel - case msg := <-o.inbound: - o.lock.Lock() - // Send it to all outbound channels - for _, c := range o.outbound { - c <- msg - } - o.lock.Unlock() - // Told to close the plexer including sender - case <-o.closeChan: - o.lock.Lock() - // First close all outbound channels - // No need to send any signal there as readers will just stop - for _, c := range o.outbound { - close(c) - } - // Then close inbound, set closed and call it a day - close(o.inbound) - o.closed = true - o.lock.Unlock() - return - } -} - -// Close the sender and all receiver channels, mark the plexer as closed and stop the distribution goroutine (all by sending one signal) -func (o *OneToMany[T]) CloseSender() { - o.closeChan <- 1 -}