119 lines
2.5 KiB
Go
119 lines
2.5 KiB
Go
package notifier
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
|
|
const (
|
|
DefaultLengthMessageQueue = 5
|
|
DefaultLengthSessionQueue = 5
|
|
)
|
|
|
|
type Notifier struct {
|
|
clients *Clients
|
|
lock sync.RWMutex
|
|
sessionChan chan *Session
|
|
messageChan chan Message
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func New() *Notifier {
|
|
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
c := Notifier{
|
|
clients: NewClients(),
|
|
lock: sync.RWMutex{},
|
|
sessionChan: make(chan *Session, DefaultLengthSessionQueue),
|
|
messageChan: make(chan Message, DefaultLengthMessageQueue),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
return &c
|
|
}
|
|
|
|
func (c *Notifier) recieveSession(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
case session := <-c.sessionChan:
|
|
log.Println("recieve a session of user", session.User(), session.ID())
|
|
c.clients.SetSession(session.User(), session)
|
|
err := session.Write(Message{Type: MessageTypeOpened, Data: fmt.Sprintf(`{"session": "%s"}`, session.ID()), To: session.ID()})
|
|
if err != nil {
|
|
log.Println("send session info", err)
|
|
return
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Notifier) recieveMessage(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-c.messageChan:
|
|
// log.Println("oxnotifier recieve a message:", msg)
|
|
c.notify(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Notifier) writeMessage(s *Session, msg Message) {
|
|
err := s.Write(msg)
|
|
if err != nil {
|
|
log.Println("write msg to ", s.ID(), "failed:", err, " close it")
|
|
s.Close()
|
|
c.DeleteSession(s.User(), s.ID())
|
|
}
|
|
}
|
|
|
|
func (c *Notifier) notify(msg Message) error {
|
|
sessions, err := c.clients.Get(msg.To)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, s := range sessions.List() {
|
|
go c.writeMessage(s, msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Notifier) Run(ctx context.Context) error {
|
|
log.Println("starting notifier")
|
|
go c.recieveSession(ctx)
|
|
c.recieveMessage(ctx)
|
|
return nil
|
|
}
|
|
|
|
func (c *Notifier)Send(data interface{}, to string){
|
|
c.SendMessage(c.ctx, Message{Data: data, To: to , Type: MessageTypeSystem})
|
|
}
|
|
|
|
func (c *Notifier) SendMessage(ctx context.Context, msg Message) (string, error) {
|
|
go func() { c.messageChan <- msg }()
|
|
return "", nil
|
|
}
|
|
|
|
func (c *Notifier) AddSession(session *Session) {
|
|
go func() { c.sessionChan <- session }()
|
|
session.Run(c.ctx)
|
|
}
|
|
|
|
func (c *Notifier) DeleteSession(user, id string) {
|
|
log.Println("delete session of user", user, id)
|
|
c.clients.DeleteSession(user, id)
|
|
}
|
|
|