blog/go/notify/notify/notify.go

104 lines
1.8 KiB
Go

package notify
import (
"context"
"sync"
"github.com/gin-gonic/gin"
)
type Notifier struct {
Consumers map[string]*Consumer
userClients map[string][]string
lock sync.Mutex
consumerChan chan *Consumer
messageChan chan Message
}
func NewNotifier()*Notifier{
return &Notifier{
Consumers: make(map[string]*Consumer),
lock: sync.Mutex{},
consumerChan: make(chan *Consumer),
messageChan: make(chan Message),
userClients: map[string][]string{},
}
}
func (n *Notifier)AddConsumer(c *gin.Context)error{
consumer, err := NewConsumer(c)
if err != nil{
return err
}
go func () {
n.consumerChan <- consumer
}()
return nil
}
func (n *Notifier)recieveMessage(ctx context.Context){
for {
select{
case <- ctx.Done():
return
case msg := <- n.messageChan:
n.notify(msg)
}
}
}
func (n *Notifier)notify(msg Message){
n.lock.Lock()
defer n.lock.Unlock()
clts, ok := n.userClients[msg.User];
if !ok || len(clts) == 0{
return
}
newClts := []string{}
for _, clt := range clts{
client, ok := n.Consumers[clt]
if ok {
err := client.Write(msg.Message)
if err != nil {
delete(n.Consumers, clt)
}else{
newClts = append(newClts, clt)
}
}
}
n.userClients[msg.User] = newClts
}
func (n *Notifier)recieveConsumer(ctx context.Context){
for {
select{
case <- ctx.Done():
return
case c := <- n.consumerChan:
n.addConsumer(c)
}
}
}
func (n *Notifier)addConsumer(c *Consumer){
n.lock.Lock()
defer n.lock.Unlock()
n.Consumers[c.ClientID] = c
us := []string{}
if u, ok := n.userClients[c.UserID]; ok {
us=append(us, u...)
}
us=append(us, c.ClientID)
n.userClients[c.UserID] = us
}
func (n *Notifier)Run(ctx context.Context){
go n.recieveConsumer(ctx)
n.recieveMessage(ctx)
}
func (n *Notifier)SendMessage(user, msg string){
go func () {
n.messageChan <- Message{User: user, Message: msg}
}()
}