packagemainimport("bufio""context""encoding/json""fmt""log""os""path/filepath""sort""strings""time""honeypot/internal/database""honeypot/internal/utils""github.com/duckdb/duckdb-go/v2")funcmain(){iflen(os.Args)!=3{fmt.Fprintf(os.Stderr,"Usage: %s <database_file> <log_pattern>\n",os.Args[0])fmt.Fprintf(os.Stderr,"Example: %s honeypot.db \"/var/log/honeypot.log*\"\n",os.Args[0])os.Exit(1)}databaseFile:=os.Args[1]logPattern:=os.Args[2]// Create database connectiondb:=database.NewDatabase(databaseFile)deferdb.Close()// Ensure tables existiferr:=db.CreateTables();err!=nil{log.Fatalf("failed to create database tables: %v",err)}// Find all log files using the patternfmt.Println("Finding log files...")logFiles,err:=FindLogFiles(logPattern)iferr!=nil{log.Fatalf("failed to find log files: %v",err)}iflen(logFiles)==0{fmt.Println("No log files found")return}fmt.Printf("Found %d log file(s)\n",len(logFiles))// Import each log filefor_,lf:=rangelogFiles{fmt.Printf("Importing log file: %s\n",lf.Path)iferr:=importLogFile(db,lf);err!=nil{log.Printf("failed to import log file %s: %v",lf.Path,err)continue}fmt.Printf("Successfully imported: %s\n",lf.Path)}fmt.Printf("Import complete. Processed %d log file(s)\n",len(logFiles))// get COUNT(*) and MAX(id) from honeypot_eventsvarcount,maxIDinterr=db.DB.QueryRow("SELECT COUNT(*), COALESCE(MAX(id), 0) FROM honeypot_events").Scan(&count,&maxID)iferr!=nil{log.Fatalf("failed to get stats: %v",err)}// restart sequence to start after the max id// Since DuckDB doesn't support ALTER SEQUENCE RESTART yet and has strict dependencies,// we use DROP SEQUENCE ... CASCADE which allows dropping even if the table depends on it._,err=db.DB.Exec(fmt.Sprintf(` DROP SEQUENCE IF EXISTS id_sequence CASCADE;
CREATE SEQUENCE id_sequence START WITH %d;
`,maxID+1))iferr!=nil{log.Fatalf("failed to restart sequence: %v",err)}fmt.Printf("Total events in database: %d (Max ID: %d)\n",count,maxID)}funcimportLogFile(db*database.Database,lfLogFileInfo)error{entries,err:=ReadLogFile(lf.Path)iferr!=nil{returnerr}iflen(entries)==0{returnnil}// Get the current max ID to generate new IDs// (appender doesn't support DEFAULT values, so we must provide explicit IDs)varmaxIDintrow:=db.DB.QueryRow("SELECT COALESCE(MAX(id), 0) FROM honeypot_events")iferr:=row.Scan(&maxID);err!=nil{returnfmt.Errorf("failed to get max id: %w",err)}nextID:=maxID+1// Use the appender for fast bulk insertsconn,err:=db.Connector.Connect(context.Background())iferr!=nil{returnfmt.Errorf("failed to connect for appender: %w",err)}deferconn.Close()appender,err:=duckdb.NewAppenderFromConn(conn,"","honeypot_events")iferr!=nil{returnfmt.Errorf("failed to create appender: %w",err)}deferappender.Close()for_,entry:=rangeentries{// Parse time string to time.Time for proper TIMESTAMPTZ handlingparsedTime,err:=time.Parse(time.RFC3339Nano,entry.Time)iferr!=nil{// Try other common formatsparsedTime,err=time.Parse(time.RFC3339,entry.Time)iferr!=nil{// Use current time as fallbackparsedTime=time.Now()}}parsedTime=parsedTime.UTC()// Ensure UTC normalizationremoteIPInt,_:=utils.IPToInt(entry.RemoteAddr)err=appender.AppendRow(nextID,parsedTime,entry.Type,entry.Event,entry.RemoteAddr,uint64(remoteIPInt),entry.RemotePort,entry.DstPort,entry.Fields,)iferr!=nil{returnfmt.Errorf("failed to append row: %w",err)}nextID++}// Flush to ensure all data is writteniferr:=appender.Flush();err!=nil{returnfmt.Errorf("failed to flush appender: %w",err)}returnnil}// setDefaultDstPort sets the default destination port based on the entry type// if dst_port is missing (0).funcsetDefaultDstPort(entry*LogEntry){ifentry.DstPort==0{switchentry.Type{case"telnet":entry.DstPort=23case"ssh":entry.DstPort=22case"http":entry.DstPort=80}}}// ReadLogFile reads log entries from a single log file.funcReadLogFile(filenamestring)([]LogEntry,error){file,err:=os.Open(filename)iferr!=nil{returnnil,err}deferfile.Close()varentries[]LogEntryscanner:=bufio.NewScanner(file)// Increase buffer size to 1MB to handle large log linesbuf:=make([]byte,1024*1024)scanner.Buffer(buf,1024*1024)forscanner.Scan(){line:=strings.TrimSpace(scanner.Text())ifline==""{continue}varentryLogEntryiferr:=json.Unmarshal([]byte(line),&entry);err!=nil{// Skip invalid JSON linescontinue}// Only include honeypot_event messagesifentry.Msg!="honeypot_event"{continue}// Extract fields that might be at the top level// We need to re-parse to handle fields that might be at root levelvarrawmap[string]interface{}iferr:=json.Unmarshal([]byte(line),&raw);err==nil{// Move fields that aren't standard into the Fields mapentry.Fields=make(map[string]interface{})skipFields:=map[string]bool{"time":true,"level":true,"msg":true,"type":true,"event":true,"remote_addr":true,"remote_port":true,"dst_port":true,}fork,v:=rangeraw{if!skipFields[k]{entry.Fields[k]=v}}}// Set default dst_port based on type if missingsetDefaultDstPort(&entry)entries=append(entries,entry)}iferr:=scanner.Err();err!=nil{returnnil,fmt.Errorf("error reading log file: %w",err)}returnentries,nil}// FindLogFiles finds all log files matching the given pattern and returns their metadata.funcFindLogFiles(patternstring)([]LogFileInfo,error){matches,err:=filepath.Glob(pattern)iferr!=nil{returnnil,fmt.Errorf("failed to expand glob pattern: %w",err)}varfiles[]LogFileInfofor_,match:=rangematches{info,err:=os.Stat(match)iferr!=nil{continue}ifinfo.IsDir(){continue}files=append(files,LogFileInfo{Filename:info.Name(),Path:match,Size:info.Size(),Modified:info.ModTime(),})}// Sort by modified time (oldest first) to process logs in chronological ordersort.Slice(files,func(i,jint)bool{returnfiles[i].Modified.Before(files[j].Modified)})returnfiles,nil}typeLogFilestruct{PathstringMaxSizeint64}// LogFileInfo represents metadata about a log file.typeLogFileInfostruct{Filenamestring`json:"filename"`Pathstring`json:"path"`Sizeint64`json:"size"`Modifiedtime.Time`json:"modified"`}// LogEntry represents a single log entry from the log file.typeLogEntrystruct{Timestring`json:"time"`Levelstring`json:"level"`Msgstring`json:"msg"`Typestring`json:"type,omitempty"`Eventstring`json:"event,omitempty"`RemoteAddrstring`json:"remote_addr,omitempty"`RemotePortuint16`json:"remote_port,omitempty"`DstPortuint16`json:"dst_port,omitempty"`Fieldsmap[string]interface{}`json:"fields,omitempty"`}