travebing/backend/notifier/notifier.go

119 lines
2.5 KiB
Go
Raw Normal View History

2024-11-19 16:58:21 +08:00
package notifier
import (
"context"
"fmt"
"log"
"sync"
)
const (
DefaultLengthMessageQueue = 5
DefaultLengthSessionQueue = 5
)
type Notifier struct {
clients *Clients
lock sync.RWMutex
sessionChan chan *Session
messageChan chan Message
ctx context.Context
cancel context.CancelFunc
}
func New() *Notifier {
ctx, cancel := context.WithCancel(context.TODO())
c := Notifier{
clients: NewClients(),
lock: sync.RWMutex{},
sessionChan: make(chan *Session, DefaultLengthSessionQueue),
messageChan: make(chan Message, DefaultLengthMessageQueue),
ctx: ctx,
cancel: cancel,
}
return &c
}
func (c *Notifier) recieveSession(ctx context.Context) {
for {
select {
case <-c.ctx.Done():
return
case <-ctx.Done():
return
case session := <-c.sessionChan:
log.Println("recieve a session of user", session.User(), session.ID())
c.clients.SetSession(session.User(), session)
err := session.Write(Message{Type: MessageTypeOpened, Data: fmt.Sprintf(`{"session": "%s"}`, session.ID()), To: session.ID()})
if err != nil {
log.Println("send session info", err)
return
}
}
}
}
func (c *Notifier) recieveMessage(ctx context.Context) {
for {
select {
case <-c.ctx.Done():
return
case <-ctx.Done():
return
case msg := <-c.messageChan:
// log.Println("oxnotifier recieve a message:", msg)
c.notify(msg)
}
}
}
func (c *Notifier) writeMessage(s *Session, msg Message) {
err := s.Write(msg)
if err != nil {
log.Println("write msg to ", s.ID(), "failed:", err, " close it")
s.Close()
c.DeleteSession(s.User(), s.ID())
}
}
func (c *Notifier) notify(msg Message) error {
sessions, err := c.clients.Get(msg.To)
if err != nil {
return err
}
for _, s := range sessions.List() {
go c.writeMessage(s, msg)
}
return nil
}
func (c *Notifier) Run(ctx context.Context) error {
log.Println("starting notifier")
go c.recieveSession(ctx)
c.recieveMessage(ctx)
return nil
}
func (c *Notifier)Send(data interface{}, to string){
c.SendMessage(c.ctx, Message{Data: data, To: to , Type: MessageTypeSystem})
}
func (c *Notifier) SendMessage(ctx context.Context, msg Message) (string, error) {
go func() { c.messageChan <- msg }()
return "", nil
}
func (c *Notifier) AddSession(session *Session) {
go func() { c.sessionChan <- session }()
session.Run(c.ctx)
}
func (c *Notifier) DeleteSession(user, id string) {
log.Println("delete session of user", user, id)
c.clients.DeleteSession(user, id)
}