packagedashboardimport("context""encoding/json""honeypot/internal/types""log""net/http""sync""time""github.com/gorilla/websocket")varupgrader=websocket.Upgrader{CheckOrigin:func(*http.Request)bool{returntrue},}func(s*Service)ServeWebSocket(whttp.ResponseWriter,r*http.Request){ServeWs(s.hub,w,r)}funcServeWs(hub*Hub,whttp.ResponseWriter,r*http.Request){conn,err:=upgrader.Upgrade(w,r,nil)iferr!=nil{log.Println(err)return}hub.register<-conngofunc(){deferfunc(){hub.unregister<-conn}()for{if_,_,err:=conn.ReadMessage();err!=nil{return}}}()}typeEventSinkstruct{hub*Hubcache[]types.LogEventmusync.Mutexticker*time.Tickerctxcontext.Contextcancelcontext.CancelFuncwgsync.WaitGroup}const(batchInterval=333*time.MillisecondmaxBatchSize=1000)funcNewEventSink(h*Hub)*EventSink{ctx,cancel:=context.WithCancel(h.ctx)es:=&EventSink{hub:h,cache:make([]types.LogEvent,0),ticker:time.NewTicker(batchInterval),ctx:ctx,cancel:cancel,}es.wg.Add(1)goes.batchLoop()returnes}func(s*EventSink)EmitEvent(etypes.LogEvent){s.mu.Lock()s.cache=append(s.cache,e)cacheSize:=len(s.cache)s.mu.Unlock()// If cache reaches max batch size, trigger immediate sendifcacheSize>=maxBatchSize{s.sendBatch()}}func(s*EventSink)sendBatch(){s.mu.Lock()iflen(s.cache)==0{s.mu.Unlock()return}// Copy up to maxBatchSize eventsbatchSize:=len(s.cache)ifbatchSize>maxBatchSize{batchSize=maxBatchSize}batch:=make([]types.LogEvent,batchSize)copy(batch,s.cache[:batchSize])s.cache=s.cache[batchSize:]s.mu.Unlock()// Marshal batch as JSON arraymsg,err:=json.Marshal(batch)iferr!=nil{log.Println("failed to marshal event batch:",err)return}// Send to hub broadcast channelselect{cases.hub.broadcast<-msg:default:// Channel full, drop message}}func(s*EventSink)batchLoop(){defers.wg.Done()defers.ticker.Stop()for{select{case<-s.ctx.Done():// Flush remaining events on shutdowns.mu.Lock()iflen(s.cache)>0{batch:=make([]types.LogEvent,len(s.cache))copy(batch,s.cache)s.cache=s.cache[:0]s.mu.Unlock()msg,err:=json.Marshal(batch)iferr==nil{select{cases.hub.broadcast<-msg:default:}}}else{s.mu.Unlock()}returncase<-s.ticker.C:s.sendBatch()}}}func(s*EventSink)Shutdown(){s.cancel()s.wg.Wait()}