packagedatabaseimport("database/sql""encoding/json""errors""fmt""honeypot/internal/types""honeypot/internal/utils""log""net/url""os""slices""strconv""strings""time""github.com/duckdb/duckdb-go/v2")typeDatabasestruct{DB*sql.DBConnector*duckdb.ConnectorPathstringIPInfoLastRuntime.Time}// NewDatabase creates a new duckdb database connection.// Returns nil if the database file is not set.funcNewDatabase(databaseFilestring)*Database{ifdatabaseFile==""{returnnil}dbExists:=falseifdatabaseFile!=""{dbExists=trueif_,err:=os.Stat(databaseFile);errors.Is(err,os.ErrNotExist){dbExists=falsefmt.Println("Database file does not exist, creating new database")}else{fmt.Println("Database file exists, using existing database")}}// Create a connector for appender supportconnector,err:=duckdb.NewConnector(databaseFile,nil)iferr!=nil{log.Fatalf("failed to create duckdb connector: %v",err)}db:=sql.OpenDB(connector)database:=&Database{DB:db,Connector:connector,Path:databaseFile}if!dbExists{fmt.Println("Creating database tables")iferr:=database.CreateTables();err!=nil{log.Printf("failed to create database tables: %v",err)}}returndatabase}func(db*Database)CreateTables()error{_,err:=db.DB.Exec(` CREATE SEQUENCE IF NOT EXISTS id_sequence START 1;
CREATE TABLE IF NOT EXISTS honeypot_events (
id INTEGER PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
type TEXT NOT NULL,
event TEXT NOT NULL,
remote_addr TEXT NOT NULL,
remote_ip_int UBIGINT,
remote_port USMALLINT NOT NULL,
dst_port USMALLINT NOT NULL,
fields JSON
);
CREATE INDEX IF NOT EXISTS idx_events_type ON honeypot_events (type);
CREATE INDEX IF NOT EXISTS idx_events_event ON honeypot_events (event);
CREATE INDEX IF NOT EXISTS idx_events_remote_ip_int ON honeypot_events (remote_ip_int);
CREATE INDEX IF NOT EXISTS idx_events_dst_port ON honeypot_events (dst_port);
CREATE INDEX IF NOT EXISTS idx_events_time ON honeypot_events (time);
CREATE SEQUENCE IF NOT EXISTS blocklist_id_sequence START 1;
CREATE TABLE IF NOT EXISTS blocklist (
id INTEGER PRIMARY KEY DEFAULT nextval('blocklist_id_sequence'),
address TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP + INTERVAL '3 HOURS',
reason TEXT NOT NULL,
);
CREATE TABLE IF NOT EXISTS ips (
ip TEXT PRIMARY KEY,
ip_int UBIGINT,
country TEXT,
asn INTEGER,
asn_org TEXT,
city TEXT,
latitude DOUBLE,
longitude DOUBLE,
fqdn TEXT,
domain TEXT,
last_updated TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_ips_ip_int ON ips (ip_int);
CREATE INDEX IF NOT EXISTS idx_ips_country ON ips (country);
CREATE INDEX IF NOT EXISTS idx_ips_asn ON ips (asn);
`)returnerr}// Close closes the database connection.func(db*Database)Close()error{varerrs[]errorifdb.DB!=nil{iferr:=db.DB.Close();err!=nil{errs=append(errs,err)}}ifdb.Connector!=nil{iferr:=db.Connector.Close();err!=nil{errs=append(errs,err)}}iflen(errs)>0{returnerrors.Join(errs...)}returnnil}// Checkpoint flushes the write-ahead log to the database file.func(db*Database)Checkpoint()error{ifdb.DB==nil{returnerrors.New("database is not connected")}_,err:=db.DB.Exec("FORCE CHECKPOINT")returnerr}func(db*Database)InsertEvent(event*types.LogEvent)error{ifdb.DB==nil{returnerrors.New("database is not connected")}varfieldsanyifevent.Fields!=nil{varerrerrorfields,err=json.Marshal(event.Fields)iferr!=nil{returnerr}}remoteIPInt,err:=utils.IPToInt(event.RemoteAddr)iferr!=nil{returnerr}_,err=db.DB.Exec(` INSERT INTO honeypot_events (id, time, type, event, remote_addr, remote_ip_int, remote_port, dst_port, fields)
VALUES (NEXTVAL('id_sequence'), ?, ?, ?, ?, ?, ?, ?, ?)
`,event.Time,event.Type,event.Event,event.RemoteAddr,remoteIPInt,event.RemotePort,event.DstPort,fields)returnerr}typeDatabaseStatsstruct{RowsEventsint`json:"rows_events"`RowsBlocklistint`json:"rows_blocklist"`RowsIpsint`json:"rows_ips"`RowsUnresolvedIPsint`json:"rows_unresolved_ips"`DatabaseSizeint64`json:"database_size"`WalSizeint64`json:"wal_size"`}func(db*Database)GetDatabaseStats()(DatabaseStats,error){varstatsDatabaseStatsifdb.DB==nil{returnstats,errors.New("database is not connected")}// Get row countserr:=db.DB.QueryRow(` SELECT
(SELECT COUNT(*) FROM honeypot_events),
(SELECT COUNT(*) FROM blocklist),
(SELECT COUNT(*) FROM ips),
(SELECT COUNT(DISTINCT remote_addr) FROM honeypot_events WHERE remote_addr NOT IN (SELECT ip FROM ips WHERE last_updated > ?))
`,time.Now().Add(-72*time.Hour)).Scan(&stats.RowsEvents,&stats.RowsBlocklist,&stats.RowsIps,&stats.RowsUnresolvedIPs,)iferr!=nil{returnstats,err}// Get file sizeifdb.Path!=""{ifinfo,err:=os.Stat(db.Path);err==nil{stats.DatabaseSize=info.Size()}// Also add WAL sizewalPath:=db.Path+".wal"ifinfo,err:=os.Stat(walPath);err==nil{stats.WalSize=info.Size()stats.DatabaseSize+=stats.WalSize}}returnstats,nil}func(db*Database)InsertBlocklist(address,reasonstring,durationtime.Duration)error{expires:=time.Now().Add(duration)_,err:=db.DB.Exec("INSERT INTO blocklist (address, expires, reason) VALUES (?, ?, ?)",address,expires,reason)returnerr}typePortCountstruct{Portint`json:"port"`Countint`json:"count"`}typeJSONMapmap[string]anyfunc(m*JSONMap)Scan(valueany)error{ifvalue==nil{*m=nilreturnnil}switchv:=value.(type){case[]byte:returnjson.Unmarshal(v,m)casestring:returnjson.Unmarshal([]byte(v),m)casemap[string]any:*m=JSONMap(v)returnnildefault:returnfmt.Errorf("unexpected type for JSONMap: %T",value)}}typeEventstruct{IDint`json:"id,omitempty"`Timetime.Time`json:"time,omitzero"`Typestring`json:"type,omitempty"`Eventstring`json:"event,omitempty"`RemoteAddrstring`json:"remote_addr,omitempty"`RemotePortuint16`json:"remote_port,omitempty"`DstPortuint16`json:"dst_port,omitempty"`FieldsJSONMap`json:"fields,omitempty"`Countrystring`json:"country,omitempty"`Citystring`json:"city,omitempty"`Latitudefloat64`json:"latitude,omitempty"`Longitudefloat64`json:"longitude,omitempty"`}typeQueryResponsestruct{Querystring`json:"query"`WhereArgs[]any`json:"where_args"`Events[]Event`json:"events"`Totalint`json:"total"`// total number of events matching the query without paginationQueryTimestring`json:"query_time"`}typeEventQuerystruct{LimitintOffsetintOrderDirectionstringRemoteAddrs[]stringRemotePorts[]stringDstPorts[]stringEvent[]stringIDs[]stringTimeStarttime.TimeTimeEndtime.TimeType[]stringFieldFiltersmap[string][]stringFieldExists[]stringColumns[]stringASNs[]stringCountries[]stringCities[]stringDomains[]stringFQDNs[]string}typeeventFieldintconst(fieldIDeventField=iotafieldTimeeventField=iota+1fieldTypefieldEventfieldRemoteAddrfieldRemotePortfieldDstPortfieldFieldsfieldCountryfieldCityfieldLatitudefieldLongitude)funcbuildFieldPlan(qEventQuery)([]eventField,error){hasGeo:=HasGeoFields(q)allowedColumns:=[]string{"id","time","type","event","remote_addr","remote_port","dst_port","fields","country","city","latitude","longitude"}validColumns:=make([]string,0,len(q.Columns))for_,col:=rangeq.Columns{ifslices.Contains(allowedColumns,col){validColumns=append(validColumns,col)}}iflen(validColumns)==0{// return all fieldsifhasGeo{return[]eventField{fieldID,fieldTime,fieldType,fieldEvent,fieldRemoteAddr,fieldRemotePort,fieldDstPort,fieldFields,fieldCountry,fieldCity,fieldLatitude,fieldLongitude},nil}return[]eventField{fieldID,fieldTime,fieldType,fieldEvent,fieldRemoteAddr,fieldRemotePort,fieldDstPort,fieldFields},nil}plan:=make([]eventField,len(validColumns))fori,col:=rangevalidColumns{switchcol{case"id":plan[i]=fieldIDcase"time":plan[i]=fieldTimecase"type":plan[i]=fieldTypecase"event":plan[i]=fieldEventcase"remote_addr":plan[i]=fieldRemoteAddrcase"remote_port":plan[i]=fieldRemotePortcase"dst_port":plan[i]=fieldDstPortcase"fields":plan[i]=fieldFieldscase"country":plan[i]=fieldCountrycase"city":plan[i]=fieldCitycase"latitude":plan[i]=fieldLatitudecase"longitude":plan[i]=fieldLongitudedefault:returnnil,fmt.Errorf("unexpected column: %s",col)}}returnplan,nil}funcscanWithPlan(rows*sql.Rows,plan[]eventField,event*Event,dests[]any)error{fori,f:=rangeplan{switchf{casefieldID:dests[i]=&event.IDcasefieldTime:dests[i]=&event.TimecasefieldType:dests[i]=&event.TypecasefieldEvent:dests[i]=&event.EventcasefieldRemoteAddr:dests[i]=&event.RemoteAddrcasefieldRemotePort:dests[i]=&event.RemotePortcasefieldDstPort:dests[i]=&event.DstPortcasefieldFields:dests[i]=&event.FieldscasefieldCountry:dests[i]=&event.CountrycasefieldCity:dests[i]=&event.CitycasefieldLatitude:dests[i]=&event.LatitudecasefieldLongitude:dests[i]=&event.Longitude}}returnrows.Scan(dests...)}typeQueryMetastruct{QuerystringWhereArgs[]anyTotalintQueryTimetime.Duration}func(db*Database)QueryEventsMeta(qurl.Values)(QueryMeta,error){start:=time.Now()query,err:=db.parseEventQuery(q)iferr!=nil{returnQueryMeta{},err}totalQuery,totalWhereArgs:=buildQueryString(query,true)rows,err:=db.DB.Query(totalQuery,totalWhereArgs...)iferr!=nil{returnQueryMeta{},err}deferrows.Close()vartotalintifrows.Next(){iferr:=rows.Scan(&total);err!=nil{returnQueryMeta{},err}}queryString,whereArgs:=buildQueryString(query,false)returnQueryMeta{Query:queryString,WhereArgs:whereArgs,Total:total,QueryTime:time.Since(start),},nil}func(db*Database)StreamEvents(qurl.Values,handlefunc(Event)error,)error{query,err:=db.parseEventQuery(q)iferr!=nil{returnerr}queryString,whereArgs:=buildQueryString(query,false)rows,err:=db.DB.Query(queryString,whereArgs...)iferr!=nil{returnerr}deferrows.Close()plan,err:=buildFieldPlan(query)iferr!=nil{returnerr}dests:=make([]any,len(plan))forrows.Next(){vareventEventiferr:=scanWithPlan(rows,plan,&event,dests);err!=nil{returnerr}iferr:=handle(event);err!=nil{returnerr}}returnrows.Err()}func(db*Database)ExportEvents(qurl.Values,handlefunc(Event)error)error{query,err:=db.parseEventQuery(q)iferr!=nil{returnerr}hasGeo:=HasGeoFields(query)queryString:=buildSelectClause(query,false,hasGeo)whereClauses,whereArgs:=buildWhereClauses(query,hasGeo)iflen(whereClauses)>0{queryString+=" WHERE "+strings.Join(whereClauses," AND ")}ifquery.OrderDirection!=""{queryString+=" ORDER BY time "+query.OrderDirection}rows,err:=db.DB.Query(queryString,whereArgs...)iferr!=nil{returnerr}deferrows.Close()plan,err:=buildFieldPlan(query)iferr!=nil{returnerr}dests:=make([]any,len(plan))forrows.Next(){vareventEventiferr:=scanWithPlan(rows,plan,&event,dests);err!=nil{returnerr}iferr:=handle(event);err!=nil{returnerr}}returnrows.Err()}func(db*Database)parseEventQuery(queryurl.Values)(EventQuery,error){limitInt,err:=getIntWithDefault(query,"limit",100)iferr!=nil{returnEventQuery{},err}offset,err:=getIntWithDefault(query,"offset",0)iferr!=nil{returnEventQuery{},err}orderDirection:=query.Get("order_direction")iforderDirection==""{orderDirection="desc"}iforderDirection!="asc"&&orderDirection!="desc"{returnEventQuery{},fmt.Errorf("invalid order_direction: %s",orderDirection)}timeStartStr:=query.Get("time_start")timeStart,err:=time.Parse(time.RFC3339,timeStartStr)iferr!=nil{timeStart,err=time.Parse(time.DateTime,timeStartStr)iferr!=nil{timeStart=time.Time{}}}timeEndStr:=query.Get("time_end")timeEnd,err:=time.Parse(time.RFC3339,timeEndStr)iferr!=nil{timeEnd,err=time.Parse(time.DateTime,timeEndStr)iferr!=nil{timeEnd=time.Time{}}}// Helper to handle both multiple parameters and comma-separated values// for fields where commas are not valid characters in the value itself.csvToSlice:=func(keystring)[]string{varres[]stringfor_,val:=rangequery[key]{parts:=strings.Split(val,",")for_,p:=rangeparts{p=strings.TrimSpace(p)ifp!=""{res=append(res,p)}}}returnres}eventQuery:=EventQuery{Limit:limitInt,Offset:offset,OrderDirection:orderDirection,TimeStart:timeStart,TimeEnd:timeEnd,FieldFilters:make(map[string][]string),FieldExists:[]string{},IDs:csvToSlice("id"),RemoteAddrs:csvToSlice("remote_addr"),RemotePorts:csvToSlice("remote_port"),DstPorts:csvToSlice("dst_port"),Type:csvToSlice("type"),Event:csvToSlice("event"),Columns:csvToSlice("columns"),ASNs:csvToSlice("asn"),Countries:csvToSlice("country"),Cities:csvToSlice("city"),Domains:csvToSlice("domain"),FQDNs:csvToSlice("fqdn"),}forkey,values:=rangequery{ifstrings.HasPrefix(key,"f:"){field:=strings.TrimPrefix(key,"f:")eventQuery.FieldFilters[field]=append(eventQuery.FieldFilters[field],values...)}elseifstrings.HasPrefix(key,"fe:"){field:=strings.TrimPrefix(key,"fe:")eventQuery.FieldExists=append(eventQuery.FieldExists,field)}}returneventQuery,nil}// getIntWithDefault parses the integer value from a URL value's key, using defaultVal if not set or empty.funcgetIntWithDefault(queryurl.Values,keystring,defaultValint)(int,error){val:=query.Get(key)ifval==""{returndefaultVal,nil}n,err:=strconv.Atoi(val)iferr!=nil{return0,err}returnn,nil}