internal/dashboard/websocket.go

package dashboard

import (
	"context"
	"encoding/json"
	"honeypot/internal/types"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
	CheckOrigin: func(*http.Request) bool { return true },
}

func (s *Service) ServeWebSocket(w http.ResponseWriter, r *http.Request) {
	ServeWs(s.hub, w, r)
}

func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}

	hub.register <- conn

	go func() {
		defer func() { hub.unregister <- conn }()
		for {
			if _, _, err := conn.ReadMessage(); err != nil {
				return
			}
		}
	}()
}

type EventSink struct {
	hub    *Hub
	cache  []types.LogEvent
	mu     sync.Mutex
	ticker *time.Ticker
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
}

const (
	batchInterval = 333 * time.Millisecond
	maxBatchSize  = 1000
)

func NewEventSink(h *Hub) *EventSink {
	ctx, cancel := context.WithCancel(h.ctx)
	es := &EventSink{
		hub:    h,
		cache:  make([]types.LogEvent, 0),
		ticker: time.NewTicker(batchInterval),
		ctx:    ctx,
		cancel: cancel,
	}

	es.wg.Add(1)
	go es.batchLoop()

	return es
}

func (s *EventSink) EmitEvent(e types.LogEvent) {
	s.mu.Lock()
	s.cache = append(s.cache, e)
	cacheSize := len(s.cache)
	s.mu.Unlock()

	// If cache reaches max batch size, trigger immediate send
	if cacheSize >= maxBatchSize {
		s.sendBatch()
	}
}

func (s *EventSink) sendBatch() {
	s.mu.Lock()
	if len(s.cache) == 0 {
		s.mu.Unlock()
		return
	}

	// Copy up to maxBatchSize events
	batchSize := len(s.cache)
	if batchSize > maxBatchSize {
		batchSize = maxBatchSize
	}

	batch := make([]types.LogEvent, batchSize)
	copy(batch, s.cache[:batchSize])
	s.cache = s.cache[batchSize:]
	s.mu.Unlock()

	// Marshal batch as JSON array
	msg, err := json.Marshal(batch)
	if err != nil {
		log.Println("failed to marshal event batch:", err)
		return
	}

	// Send to hub broadcast channel
	select {
	case s.hub.broadcast <- msg:
	default:
		// Channel full, drop message
	}
}

func (s *EventSink) batchLoop() {
	defer s.wg.Done()
	defer s.ticker.Stop()

	for {
		select {
		case <-s.ctx.Done():
			// Flush remaining events on shutdown
			s.mu.Lock()
			if len(s.cache) > 0 {
				batch := make([]types.LogEvent, len(s.cache))
				copy(batch, s.cache)
				s.cache = s.cache[:0]
				s.mu.Unlock()

				msg, err := json.Marshal(batch)
				if err == nil {
					select {
					case s.hub.broadcast <- msg:
					default:
					}
				}
			} else {
				s.mu.Unlock()
			}
			return

		case <-s.ticker.C:
			s.sendBatch()
		}
	}
}

func (s *EventSink) Shutdown() {
	s.cancel()
	s.wg.Wait()
}