packagedatabaseimport("context""database/sql""errors""fmt""honeypot/internal/geodb""honeypot/internal/utils""log""log/slog""strconv""strings""sync""time")// GetUniqueIPsFromEvents returns a list of unique remote addresses from the honeypot_events table.// If includeFresh is false, it only returns IPs that are either missing from the ips table// or have metadata older than the given duration.func(db*Database)GetUniqueIPsFromEvents(includeFreshbool,olderThantime.Duration)([]string,error){query:="SELECT DISTINCT remote_addr FROM honeypot_events"varargs[]anyif!includeFresh{query=` SELECT DISTINCT remote_addr
FROM honeypot_events
WHERE remote_addr NOT IN (
SELECT ip FROM ips WHERE last_updated > ?
)
`args=append(args,time.Now().Add(-olderThan))}query+=" ORDER BY remote_addr"rows,err:=db.DB.Query(query,args...)iferr!=nil{returnnil,err}deferrows.Close()varips[]stringforrows.Next(){varipstringiferr:=rows.Scan(&ip);err!=nil{returnnil,err}ips=append(ips,ip)}returnips,nil}// IPMetadata represents the metadata for an IP address.typeIPMetadatastruct{IPstring`json:"ip"`IPIntuint32`json:"ip_int"`Countrystring`json:"country"`ASNint`json:"asn"`ASNOrgstring`json:"asn_org"`Citystring`json:"city"`Latitudefloat64`json:"latitude"`Longitudefloat64`json:"longitude"`FQDNstring`json:"fqdn"`Domainstring`json:"domain"`LastUpdatedtime.Time`json:"last_updated"`}varupsertMusync.Mutex// UpsertIPMetadata inserts or updates IP metadata in the ips table.func(db*Database)UpsertIPMetadata(m*IPMetadata)error{upsertMu.Lock()deferupsertMu.Unlock()now:=time.Now()_,err:=db.DB.Exec(` INSERT INTO ips (
ip, ip_int, country, asn, asn_org, city,
latitude, longitude, fqdn, domain, last_updated
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (ip) DO UPDATE SET
ip_int = excluded.ip_int,
country = excluded.country,
asn = excluded.asn,
asn_org = excluded.asn_org,
city = excluded.city,
latitude = excluded.latitude,
longitude = excluded.longitude,
fqdn = excluded.fqdn,
domain = excluded.domain,
last_updated = excluded.last_updated
`,m.IP,m.IPInt,m.Country,m.ASN,m.ASNOrg,m.City,m.Latitude,m.Longitude,m.FQDN,m.Domain,now)returnerr}// GetIPMetadata retrieves IP metadata from the ips table for a list of IPs.func(db*Database)GetIPMetadata(ips[]string)(map[string]*IPMetadata,error){iflen(ips)==0{returnmake(map[string]*IPMetadata),nil}placeholders:=strings.TrimRight(strings.Repeat("?, ",len(ips)),", ")query:=fmt.Sprintf(` SELECT ip, ip_int, country, asn, asn_org, city,
latitude, longitude, fqdn, domain, last_updated
FROM ips WHERE ip IN (%s)
`,placeholders)args:=make([]any,len(ips))fori,ip:=rangeips{args[i]=ip}rows,err:=db.DB.Query(query,args...)iferr!=nil{returnnil,err}deferrows.Close()results:=make(map[string]*IPMetadata)forrows.Next(){varmIPMetadataerr:=rows.Scan(&m.IP,&m.IPInt,&m.Country,&m.ASN,&m.ASNOrg,&m.City,&m.Latitude,&m.Longitude,&m.FQDN,&m.Domain,&m.LastUpdated,)iferr!=nil{returnnil,err}results[m.IP]=&m}returnresults,nil}varisUpdatingIPInfoboolvarupdateMusync.Mutex// UpdateIPInfo updates the metadata for all unique IPs in the database.// It uses the provided GeoDB to look up metadata and upserts it into the database.// This function is thread-safe and will only run one update at a time.func(db*Database)UpdateIPInfo(ctxcontext.Context,geoDb*geodb.GeoDB,l*slog.Logger){updateMu.Lock()ifisUpdatingIPInfo{updateMu.Unlock()return}isUpdatingIPInfo=trueupdateMu.Unlock()deferfunc(){updateMu.Lock()isUpdatingIPInfo=falseupdateMu.Unlock()}()ips,err:=db.GetUniqueIPsFromEvents(false,3*24*time.Hour)iferr!=nil{l.Error("failed to get unique IPs from events","error",err)return}varwgsync.WaitGroupsem:=make(chanstruct{},50)ProcessingLoop:for_,ip:=rangeips{select{case<-ctx.Done():breakProcessingLoopcasesem<-struct{}{}:wg.Add(1)gofunc(ipstring){deferwg.Done()deferfunc(){<-sem}()meta,err:=geoDb.LookupMetadata(ctx,ip)iferr!=nil{return}dbMeta:=&IPMetadata{IP:meta.IP,IPInt:uint32(0),Country:meta.CountryCode,ASN:meta.ASN,ASNOrg:meta.ASNOrg,City:meta.City,Latitude:meta.Latitude,Longitude:meta.Longitude,FQDN:meta.FQDN,Domain:meta.Domain,}// Calculate IP Int if possibleipInt,err:=utils.IPToInt(ip)iferr==nil{dbMeta.IPInt=ipInt}iferr:=db.UpsertIPMetadata(dbMeta);err!=nil{l.Error("failed to upsert IP metadata","ip",ip,"error",err)}}(ip)}}wg.Wait()db.IPInfoLastRun=time.Now()}// GetGeoStats returns the top 50 remote addresses, top 20 ports, top 20 events for a given geo locationfunc(db*Database)GetGeoStats(geoTypestring,valuestring)(*HoneypotStats,error){ifdb.DB==nil{returnnil,errors.New("database is not connected")}stats:=&HoneypotStats{}where,args,err:=GetGeoWhere(geoType,value)iferr!=nil{returnnil,err}// Set title and metadatastats.Metadata=make(map[string]any)value=strings.TrimSpace(value)switchgeoType{case"asn":varorg,countrystring// Try to find any record for this ASN that has an organization namequery:="SELECT asn_org, country FROM ips WHERE asn = ? AND asn_org IS NOT NULL AND asn_org != '' LIMIT 1"err:=db.DB.QueryRow(query,value).Scan(&org,&country)iferr!=nil{// Try as integer if string comparison failedasnInt,_:=strconv.Atoi(value)db.DB.QueryRow(query,asnInt).Scan(&org,&country)}stats.Title="ASN "+valueiforg!=""{stats.Metadata["asn_org"]=org}ifcountry!=""{stats.Metadata["country"]=country}case"country":varcountrystringdb.DB.QueryRow("SELECT DISTINCT country FROM ips WHERE country = ? AND country IS NOT NULL LIMIT 1",value).Scan(&country)ifcountry!=""{stats.Title=country}else{stats.Title=value}stats.Metadata["country"]=valuecase"city":varcountrystringdb.DB.QueryRow("SELECT DISTINCT country FROM ips WHERE city = ? AND country IS NOT NULL LIMIT 1",value).Scan(&country)stats.Title=valueifcountry!=""{stats.Metadata["country"]=country}case"domain":stats.Title=valuecase"fqdn":stats.Title=value}// Get total events, first seen, last seenvarfirstSeen,lastSeensql.NullTimeerr=db.DB.QueryRow(fmt.Sprintf(` SELECT COUNT(*), MIN(time), MAX(time)
FROM honeypot_events
JOIN ips ON honeypot_events.remote_addr = ips.ip
WHERE %s
`,where),args...).Scan(&stats.TotalEvents,&firstSeen,&lastSeen)iferr!=nil{log.Printf("failed to get geo event overview: %v",err)}iffirstSeen.Valid{stats.FirstSeen=firstSeen.Time.Format(time.RFC3339Nano)}iflastSeen.Valid{stats.LastSeen=lastSeen.Time.Format(time.RFC3339Nano)}fields:=[][2]string{{"event","event"},{"remote_addr","remote_addr"},{"dst_port","dst_port"},{"type","type"},{"ips.fqdn","fqdn"},}// We need to use JOIN for geo statsfor_,fieldName:=rangefields{selectField:=fieldName[0]ifselectField!="fields"&&!strings.Contains(selectField,"ips."){selectField="honeypot_events."+selectField}func(){rows,err:=db.DB.Query(fmt.Sprintf(` SELECT %s as field, COUNT(*) as count
FROM honeypot_events
JOIN ips ON honeypot_events.remote_addr = ips.ip
WHERE %s AND field IS NOT NULL
GROUP BY field
ORDER BY count DESC
LIMIT ?
`,selectField,where),append(args,20)...)iferr!=nil{log.Printf("failed to query geo stats for field %s: %v",fieldName[1],err)return}deferrows.Close()forrows.Next(){varfcLabelCountiferr:=rows.Scan(&fc.Label,&fc.Count);err!=nil{log.Printf("failed to scan geo stats result for field %s: %v",fieldName[1],err)return}switchfieldName[1]{case"event":stats.EventTypes=append(stats.EventTypes,fc)case"remote_addr":stats.RemoteAddrs=append(stats.RemoteAddrs,fc)case"dst_port":stats.Ports=append(stats.Ports,fc)case"type":// We reuse EventTypes or add a new field if needed, but for now let's just use what's therecase"fqdn":stats.FQDNs=append(stats.FQDNs,fc)}}}()}returnstats,nil}// GetGeoWhere returns the where clause and arguments for a given geo type and valuefuncGetGeoWhere(geoTypestring,valuestring)(string,[]any,error){varwherestringvarargs[]anyswitchgeoType{case"asn":where="ips.asn = ?"case"country":where="ips.country = ?"case"city":where="ips.city = ?"case"domain":where="ips.domain = ?"case"fqdn":where="ips.fqdn = ?"default:return"",nil,fmt.Errorf("invalid geo type: %s",geoType)}args=[]any{value}returnwhere,args,nil}