140 lines
2.6 KiB
Go
140 lines
2.6 KiB
Go
package notifier
|
|
|
|
|
|
import (
|
|
// "compress/gzip"
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"strings"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type Session struct {
|
|
user string
|
|
id string
|
|
w gin.ResponseWriter
|
|
blockChan chan int
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
flusher http.Flusher
|
|
writer io.Writer
|
|
}
|
|
|
|
func NewSession(c *gin.Context) (*Session, error) {
|
|
user := c.Param("plan")
|
|
clientID := uuid.New().String()
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
session := Session{
|
|
user: user,
|
|
id: clientID,
|
|
blockChan: make(chan int),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
useGzip := false
|
|
if strings.Contains(c.GetHeader("Accept-Encoding"), "gzip") {
|
|
useGzip = true
|
|
}
|
|
|
|
err := session.init(useGzip, c.Writer)
|
|
return &session, err
|
|
}
|
|
|
|
func (s *Session) WriteString(str string) error {
|
|
_, err := s.writer.Write([]byte(str))
|
|
if err == nil {
|
|
s.flusher.Flush()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *Session) Write(msg Message) error {
|
|
if s.writer == nil {
|
|
return errors.New("conn is nil")
|
|
}
|
|
data, err := msg.SSEBytes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, err := s.writer.Write(msg.SSEIDBytes())
|
|
if err != nil {
|
|
log.Println("msg type write failed ", s.id, n, err)
|
|
return err
|
|
}
|
|
n, err = s.writer.Write(data)
|
|
if err != nil {
|
|
log.Println("msg data write failed ", s.id, n, err)
|
|
return err
|
|
}
|
|
s.flusher.Flush()
|
|
return err
|
|
}
|
|
|
|
func (s *Session) init(gz bool, writer gin.ResponseWriter) error {
|
|
// header := writer.Header()
|
|
// header.Set("Content-Type", "text/event-stream")
|
|
// header.Set("Cache-Control", "no-cache")
|
|
// header.Set("Connection", "keep-alive")
|
|
// header.Set("Access-Control-Allow-Origin", "*")
|
|
// header.Set("Session-ID", s.id)
|
|
// if gz {
|
|
// header.Set("Content-Encoding", "gzip")
|
|
// }
|
|
// writer.WriteHeader(http.StatusOK)
|
|
// writer.Flush()
|
|
s.w = writer
|
|
flusher, ok := writer.(http.Flusher)
|
|
if !ok {
|
|
return errors.New("flusher not support")
|
|
}
|
|
s.flusher = flusher
|
|
w, ok := writer.(io.Writer)
|
|
if !ok {
|
|
return errors.New("writer not support")
|
|
}
|
|
s.writer = w
|
|
// if gz {
|
|
// s.writer = gzip.NewWriter(s.writer)
|
|
// }
|
|
// err := s.Write(Message{Type: MessageTypeOpened, Data: s.id, To: s.id})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) ID() string {
|
|
return s.id
|
|
}
|
|
|
|
func (s *Session) User() string {
|
|
return s.user
|
|
}
|
|
|
|
func (s *Session) Run(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-s.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Session) Flush() {
|
|
if s.flusher != nil {
|
|
s.flusher.Flush()
|
|
}
|
|
}
|
|
|
|
func (s *Session) Close() {
|
|
defer s.cancel()
|
|
}
|
|
|
|
func (s *Session) Messages() <-chan Message {
|
|
return nil
|
|
} |