generated from bing/readnotes
104 lines
1.8 KiB
Go
104 lines
1.8 KiB
Go
package notify
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
type Notifier struct {
|
|
Consumers map[string]*Consumer
|
|
userClients map[string][]string
|
|
lock sync.Mutex
|
|
consumerChan chan *Consumer
|
|
messageChan chan Message
|
|
}
|
|
|
|
func NewNotifier()*Notifier{
|
|
return &Notifier{
|
|
Consumers: make(map[string]*Consumer),
|
|
lock: sync.Mutex{},
|
|
consumerChan: make(chan *Consumer),
|
|
messageChan: make(chan Message),
|
|
userClients: map[string][]string{},
|
|
}
|
|
}
|
|
|
|
func (n *Notifier)AddConsumer(c *gin.Context)error{
|
|
consumer, err := NewConsumer(c)
|
|
if err != nil{
|
|
return err
|
|
}
|
|
go func () {
|
|
n.consumerChan <- consumer
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (n *Notifier)recieveMessage(ctx context.Context){
|
|
for {
|
|
select{
|
|
case <- ctx.Done():
|
|
return
|
|
case msg := <- n.messageChan:
|
|
n.notify(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *Notifier)notify(msg Message){
|
|
n.lock.Lock()
|
|
defer n.lock.Unlock()
|
|
clts, ok := n.userClients[msg.User];
|
|
if !ok || len(clts) == 0{
|
|
return
|
|
}
|
|
newClts := []string{}
|
|
for _, clt := range clts{
|
|
client, ok := n.Consumers[clt]
|
|
if ok {
|
|
err := client.Write(msg.Message)
|
|
if err != nil {
|
|
delete(n.Consumers, clt)
|
|
}else{
|
|
newClts = append(newClts, clt)
|
|
}
|
|
}
|
|
}
|
|
n.userClients[msg.User] = newClts
|
|
}
|
|
|
|
func (n *Notifier)recieveConsumer(ctx context.Context){
|
|
for {
|
|
select{
|
|
case <- ctx.Done():
|
|
return
|
|
case c := <- n.consumerChan:
|
|
n.addConsumer(c)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *Notifier)addConsumer(c *Consumer){
|
|
n.lock.Lock()
|
|
defer n.lock.Unlock()
|
|
n.Consumers[c.ClientID] = c
|
|
us := []string{}
|
|
if u, ok := n.userClients[c.UserID]; ok {
|
|
us=append(us, u...)
|
|
}
|
|
us=append(us, c.ClientID)
|
|
n.userClients[c.UserID] = us
|
|
}
|
|
|
|
func (n *Notifier)Run(ctx context.Context){
|
|
go n.recieveConsumer(ctx)
|
|
n.recieveMessage(ctx)
|
|
}
|
|
|
|
func (n *Notifier)SendMessage(user, msg string){
|
|
go func () {
|
|
n.messageChan <- Message{User: user, Message: msg}
|
|
}()
|
|
} |