cmd/import-logs/main.go

package main

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"path/filepath"
	"sort"
	"strings"
	"time"

	"honeypot/internal/database"
	"honeypot/internal/utils"

	"github.com/duckdb/duckdb-go/v2"
)

func main() {
	if len(os.Args) != 3 {
		fmt.Fprintf(os.Stderr, "Usage: %s <database_file> <log_pattern>\n", os.Args[0])
		fmt.Fprintf(os.Stderr, "Example: %s honeypot.db \"/var/log/honeypot.log*\"\n", os.Args[0])
		os.Exit(1)
	}

	databaseFile := os.Args[1]
	logPattern := os.Args[2]

	// Create database connection
	db := database.NewDatabase(databaseFile)
	defer db.Close()

	// Ensure tables exist
	if err := db.CreateTables(); err != nil {
		log.Fatalf("failed to create database tables: %v", err)
	}

	// Find all log files using the pattern
	fmt.Println("Finding log files...")
	logFiles, err := FindLogFiles(logPattern)
	if err != nil {
		log.Fatalf("failed to find log files: %v", err)
	}

	if len(logFiles) == 0 {
		fmt.Println("No log files found")
		return
	}

	fmt.Printf("Found %d log file(s)\n", len(logFiles))

	// Import each log file
	for _, lf := range logFiles {
		fmt.Printf("Importing log file: %s\n", lf.Path)
		if err := importLogFile(db, lf); err != nil {
			log.Printf("failed to import log file %s: %v", lf.Path, err)
			continue
		}
		fmt.Printf("Successfully imported: %s\n", lf.Path)
	}

	fmt.Printf("Import complete. Processed %d log file(s)\n", len(logFiles))

	// get COUNT(*) and MAX(id) from honeypot_events
	var count, maxID int
	err = db.DB.QueryRow("SELECT COUNT(*), COALESCE(MAX(id), 0) FROM honeypot_events").Scan(&count, &maxID)
	if err != nil {
		log.Fatalf("failed to get stats: %v", err)
	}

	// restart sequence to start after the max id
	// Since DuckDB doesn't support ALTER SEQUENCE RESTART yet and has strict dependencies,
	// we use DROP SEQUENCE ... CASCADE which allows dropping even if the table depends on it.
	_, err = db.DB.Exec(fmt.Sprintf(`
		DROP SEQUENCE IF EXISTS id_sequence CASCADE;
		CREATE SEQUENCE id_sequence START WITH %d;
	`, maxID+1))
	if err != nil {
		log.Fatalf("failed to restart sequence: %v", err)
	}
	fmt.Printf("Total events in database: %d (Max ID: %d)\n", count, maxID)
}

func importLogFile(db *database.Database, lf LogFileInfo) error {
	entries, err := ReadLogFile(lf.Path)
	if err != nil {
		return err
	}

	if len(entries) == 0 {
		return nil
	}

	// Get the current max ID to generate new IDs
	// (appender doesn't support DEFAULT values, so we must provide explicit IDs)
	var maxID int
	row := db.DB.QueryRow("SELECT COALESCE(MAX(id), 0) FROM honeypot_events")
	if err := row.Scan(&maxID); err != nil {
		return fmt.Errorf("failed to get max id: %w", err)
	}
	nextID := maxID + 1

	// Use the appender for fast bulk inserts
	conn, err := db.Connector.Connect(context.Background())
	if err != nil {
		return fmt.Errorf("failed to connect for appender: %w", err)
	}
	defer conn.Close()

	appender, err := duckdb.NewAppenderFromConn(conn, "", "honeypot_events")
	if err != nil {
		return fmt.Errorf("failed to create appender: %w", err)
	}
	defer appender.Close()

	for _, entry := range entries {
		// Parse time string to time.Time for proper TIMESTAMPTZ handling
		parsedTime, err := time.Parse(time.RFC3339Nano, entry.Time)
		if err != nil {
			// Try other common formats
			parsedTime, err = time.Parse(time.RFC3339, entry.Time)
			if err != nil {
				// Use current time as fallback
				parsedTime = time.Now()
			}
		}
		parsedTime = parsedTime.UTC() // Ensure UTC normalization

		remoteIPInt, _ := utils.IPToInt(entry.RemoteAddr)

		err = appender.AppendRow(
			nextID,
			parsedTime,
			entry.Type,
			entry.Event,
			entry.RemoteAddr,
			uint64(remoteIPInt),
			entry.RemotePort,
			entry.DstPort,
			entry.Fields,
		)
		if err != nil {
			return fmt.Errorf("failed to append row: %w", err)
		}
		nextID++
	}

	// Flush to ensure all data is written
	if err := appender.Flush(); err != nil {
		return fmt.Errorf("failed to flush appender: %w", err)
	}

	return nil
}

// setDefaultDstPort sets the default destination port based on the entry type
// if dst_port is missing (0).
func setDefaultDstPort(entry *LogEntry) {
	if entry.DstPort == 0 {
		switch entry.Type {
		case "telnet":
			entry.DstPort = 23
		case "ssh":
			entry.DstPort = 22
		case "http":
			entry.DstPort = 80
		}
	}
}

// ReadLogFile reads log entries from a single log file.
func ReadLogFile(filename string) ([]LogEntry, error) {
	file, err := os.Open(filename)
	if err != nil {
		return nil, err
	}
	defer file.Close()

	var entries []LogEntry
	scanner := bufio.NewScanner(file)
	// Increase buffer size to 1MB to handle large log lines
	buf := make([]byte, 1024*1024)
	scanner.Buffer(buf, 1024*1024)

	for scanner.Scan() {
		line := strings.TrimSpace(scanner.Text())
		if line == "" {
			continue
		}

		var entry LogEntry
		if err := json.Unmarshal([]byte(line), &entry); err != nil {
			// Skip invalid JSON lines
			continue
		}

		// Only include honeypot_event messages
		if entry.Msg != "honeypot_event" {
			continue
		}

		// Extract fields that might be at the top level
		// We need to re-parse to handle fields that might be at root level
		var raw map[string]interface{}
		if err := json.Unmarshal([]byte(line), &raw); err == nil {
			// Move fields that aren't standard into the Fields map
			entry.Fields = make(map[string]interface{})
			skipFields := map[string]bool{
				"time":        true,
				"level":       true,
				"msg":         true,
				"type":        true,
				"event":       true,
				"remote_addr": true,
				"remote_port": true,
				"dst_port":    true,
			}

			for k, v := range raw {
				if !skipFields[k] {
					entry.Fields[k] = v
				}
			}
		}

		// Set default dst_port based on type if missing
		setDefaultDstPort(&entry)

		entries = append(entries, entry)
	}

	if err := scanner.Err(); err != nil {
		return nil, fmt.Errorf("error reading log file: %w", err)
	}

	return entries, nil
}

// FindLogFiles finds all log files matching the given pattern and returns their metadata.
func FindLogFiles(pattern string) ([]LogFileInfo, error) {
	matches, err := filepath.Glob(pattern)
	if err != nil {
		return nil, fmt.Errorf("failed to expand glob pattern: %w", err)
	}

	var files []LogFileInfo
	for _, match := range matches {
		info, err := os.Stat(match)
		if err != nil {
			continue
		}

		if info.IsDir() {
			continue
		}

		files = append(files, LogFileInfo{
			Filename: info.Name(),
			Path:     match,
			Size:     info.Size(),
			Modified: info.ModTime(),
		})
	}

	// Sort by modified time (oldest first) to process logs in chronological order
	sort.Slice(files, func(i, j int) bool {
		return files[i].Modified.Before(files[j].Modified)
	})

	return files, nil
}

type LogFile struct {
	Path    string
	MaxSize int64
}

// LogFileInfo represents metadata about a log file.
type LogFileInfo struct {
	Filename string    `json:"filename"`
	Path     string    `json:"path"`
	Size     int64     `json:"size"`
	Modified time.Time `json:"modified"`
}

// LogEntry represents a single log entry from the log file.
type LogEntry struct {
	Time       string                 `json:"time"`
	Level      string                 `json:"level"`
	Msg        string                 `json:"msg"`
	Type       string                 `json:"type,omitempty"`
	Event      string                 `json:"event,omitempty"`
	RemoteAddr string                 `json:"remote_addr,omitempty"`
	RemotePort uint16                 `json:"remote_port,omitempty"`
	DstPort    uint16                 `json:"dst_port,omitempty"`
	Fields     map[string]interface{} `json:"fields,omitempty"`
}