From 7a00af46de206b9d38f22c955e8820faedeedc31 Mon Sep 17 00:00:00 2001 From: Rikki Date: Tue, 15 Apr 2025 10:46:39 +0800 Subject: restructure project --- .github/workflows/release.yaml | 2 +- cmd/v2stat/daemon.go | 45 ++++++++ cmd/v2stat/main.go | 159 +++++++++++++++++++++++++ db.go | 257 ----------------------------------------- def.go | 66 +++++++++++ main.go | 207 --------------------------------- v2stat.go | 234 +++++++++++++++++++++++++++++++++++++ 7 files changed, 505 insertions(+), 465 deletions(-) create mode 100644 cmd/v2stat/daemon.go create mode 100644 cmd/v2stat/main.go delete mode 100644 db.go create mode 100644 def.go delete mode 100644 main.go create mode 100644 v2stat.go diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 2f15d75..e70b4c4 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -29,7 +29,7 @@ jobs: - name: Build binary for ${{ matrix.arch }} run: | echo "Building for GOARCH=${{ matrix.arch }}" - go build -v -o v2stat-${{ matrix.arch }} . + go build -v -o v2stat go.rikki.moe/v2stat/cmd/v2stat env: GOARCH: ${{ matrix.arch }} diff --git a/cmd/v2stat/daemon.go b/cmd/v2stat/daemon.go new file mode 100644 index 0000000..8c5e91f --- /dev/null +++ b/cmd/v2stat/daemon.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "go.rikki.moe/v2stat" +) + +func runDaemon(v2s *v2stat.V2Stat) { + if err := v2s.InitDB(); err != nil { + logger.Fatalf("Failed to initialize database: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigCh) + + ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second) + defer ticker.Stop() + + for { + logger.Info("Recording stats...") + if err := v2s.RecordNow(ctx); err != nil { + logger.Errorf("Failed to record stats: %v", err) + } + + select { + case <-ticker.C: + continue + case <-sigCh: + logger.Info("Received shutdown signal, exiting.") + return + case <-ctx.Done(): + logger.Info("Context canceled, exiting.") + return + } + } +} diff --git a/cmd/v2stat/main.go b/cmd/v2stat/main.go new file mode 100644 index 0000000..ce0c3ed --- /dev/null +++ b/cmd/v2stat/main.go @@ -0,0 +1,159 @@ +package main + +import ( + "database/sql" + "flag" + "fmt" + "os" + + "github.com/jedib0t/go-pretty/table" + "github.com/jedib0t/go-pretty/text" + _ "github.com/mattn/go-sqlite3" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + + "go.rikki.moe/v2stat" +) + +var ( + flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats") + flagDatabase = flag.String("db", "", "Path to SQLite database") + flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address") + flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)") +) + +var DefaultDBPaths = []string{ + "v2stat.db", + "traffic.db", + "/var/lib/v2stat/traffic.db", + "/usr/local/share/v2stat/traffic.db", + "/opt/apps/v2stat/traffic.db", +} + +var logger *logrus.Logger + +func main() { + flag.Parse() + args := flag.Args() + + if len(args) < 1 { + fmt.Println("Usage: v2stat [args]") + fmt.Println("Available commands: daemon, query") + os.Exit(1) + } + + logger = setupLogger(*flagLogLevel) + db := setupDatabase(logger, *flagDatabase) + + v2s := v2stat.NewV2Stat(logger, db, nil) + defer v2s.Close() + + switch args[0] { + case "daemon": + conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure()) + if err != nil { + logger.Fatalf("Failed to dial gRPC server: %v", err) + } + v2s.SetConn(conn) + runDaemon(v2s) + + case "query": + handleQuery(v2s, args[1:]) + + default: + fmt.Println("Unknown command:", args[0]) + fmt.Println("Available commands: daemon, query") + os.Exit(1) + } +} + +func setupLogger(levelStr string) *logrus.Logger { + level, err := logrus.ParseLevel(levelStr) + if err != nil { + logrus.Fatalf("Invalid log level: %v", err) + } + logger := logrus.New() + logger.SetLevel(level) + return logger +} + +func setupDatabase(logger *logrus.Logger, dbpath string) *sql.DB { + if dbpath == "" { + for _, path := range DefaultDBPaths { + if _, err := os.Stat(path); err == nil { + dbpath = path + break + } + } + if dbpath == "" { + logger.Fatal("No database path provided and no default database found.") + } + } + logger.Infof("Using database: %s", dbpath) + + db, err := sql.Open("sqlite3", dbpath) + if err != nil { + logger.Fatalf("Failed to open database: %v", err) + } + return db +} + +func handleQuery(v2s *v2stat.V2Stat, args []string) { + if len(args) == 0 { + fmt.Println("Usage: v2stat query ") + fmt.Println("Available connections:") + conns, err := v2s.QueryConn() + if err != nil { + logger.Fatalf("Failed to query connection: %v", err) + } + for _, c := range conns { + fmt.Printf("\t%s\n", c.String()) + } + return + } + + connStr := args[0] + conn, ok := v2stat.ParseConnInfo(connStr) + if !ok { + logger.Fatalf("Invalid connection format: %s", connStr) + } + + stats, err := v2s.QueryStatsHourly(&conn) + if err != nil { + logger.Fatalf("Failed to query stats: %v", err) + } + + printStatsTable(stats) +} + +func printStatsTable(stats []v2stat.TrafficStat) { + tb := table.NewWriter() + tb.SetOutputMirror(os.Stdout) + tb.AppendHeader(table.Row{"Time", "Downlink", "Uplink"}) + + var totalDown, totalUp int64 + + for _, stat := range stats { + totalDown += stat.Downlink + totalUp += stat.Uplink + tb.AppendRow(table.Row{stat.Time, sizeToHuman(stat.Downlink), sizeToHuman(stat.Uplink)}) + } + + tb.AppendFooter(table.Row{"Total", sizeToHuman(totalDown), sizeToHuman(totalUp)}) + style := table.StyleLight + style.Format.Footer = text.FormatDefault + tb.SetStyle(style) + tb.Render() +} + +func sizeToHuman(size int64) string { + units := []string{"B", "KiB", "MiB", "GiB", "TiB"} + value := float64(size) + for _, unit := range units { + if value < 1024 { + return fmt.Sprintf("%7.2f %s", value, unit) + } + value /= 1024 + } + return fmt.Sprintf("%7.2f %s", value, "PiB") +} diff --git a/db.go b/db.go deleted file mode 100644 index 8f7a5c7..0000000 --- a/db.go +++ /dev/null @@ -1,257 +0,0 @@ -package main - -import ( - "context" - "strings" - "time" - - "go.rikki.moe/v2stat/command" -) - -type TrafficDirection int - -const ( - DirectionDownlink TrafficDirection = iota - DirectionUplink -) - -type ConnectionType int - -const ( - ConnTypeUser ConnectionType = iota - ConnTypeInbound - ConnTypeOutbound -) - -type ConnInfo struct { - Type ConnectionType `json:"type"` - Name string `json:"name"` -} - -type TrafficStat struct { - Time string `json:"time"` - Downlink int64 `json:"downlink"` - Uplink int64 `json:"uplink"` -} - -func (ci *ConnInfo) String() string { - switch ci.Type { - case ConnTypeUser: - return "user:" + ci.Name - case ConnTypeInbound: - return "inbound:" + ci.Name - case ConnTypeOutbound: - return "outbound:" + ci.Name - default: - return "unknown:" + ci.Name - } -} - -func ParseConnInfo(s string) (ConnInfo, bool) { - parts := strings.Split(s, ":") - if len(parts) != 2 { - return ConnInfo{}, false - } - var connType ConnectionType - switch parts[0] { - case "user": - connType = ConnTypeUser - case "inbound": - connType = ConnTypeInbound - case "outbound": - connType = ConnTypeOutbound - default: - return ConnInfo{}, false - } - return ConnInfo{ - Type: connType, - Name: parts[1], - }, true -} - - -func (v *V2Stat) InitDB() error { - stmts := []string{ - `CREATE TABLE IF NOT EXISTS conn ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - type INTEGER NOT NULL, - name TEXT NOT NULL, - UNIQUE (type, name) - );`, - `CREATE TABLE IF NOT EXISTS stats ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - conn_id INTEGER NOT NULL, - timestamp INTEGER NOT NULL, - traffic INTEGER NOT NULL, - direction INTEGER NOT NULL, - FOREIGN KEY (conn_id) REFERENCES conn (id) - ON DELETE CASCADE - ON UPDATE CASCADE - );`, - `CREATE INDEX IF NOT EXISTS idx_conn_id ON stats (conn_id);`, - `CREATE INDEX IF NOT EXISTS idx_timestamp ON stats (timestamp);`, - } - - for _, stmt := range stmts { - if _, err := v.db.Exec(stmt); err != nil { - return err - } - } - return nil -} - -func (v *V2Stat) RecordNow(ctx context.Context) error { - resp, err := v.stat.QueryStats(ctx, &command.QueryStatsRequest{ - Reset_: true, - }) - if err != nil { - v.logger.Errorf("Failed to query stats: %v", err) - return err - } - - tx, err := v.db.Begin() - if err != nil { - v.logger.Errorf("Failed to begin transaction: %v", err) - return err - } - defer tx.Rollback() // safe to call even if already committed - - insertConnStmt, err := tx.Prepare(`INSERT OR IGNORE INTO conn (type, name) VALUES (?, ?)`) - if err != nil { - v.logger.Errorf("Failed to prepare conn insert statement: %v", err) - return err - } - defer insertConnStmt.Close() - - selectConnIDStmt, err := tx.Prepare(`SELECT id FROM conn WHERE type = ? AND name = ?`) - if err != nil { - v.logger.Errorf("Failed to prepare conn select statement: %v", err) - return err - } - defer selectConnIDStmt.Close() - - insertStatsStmt, err := tx.Prepare(` - INSERT INTO stats (conn_id, timestamp, traffic, direction) - VALUES (?, ?, ?, ?) - `) - if err != nil { - v.logger.Errorf("Failed to prepare stats insert statement: %v", err) - return err - } - defer insertStatsStmt.Close() - - for _, stat := range resp.Stat { - connType, connName, direction, ok := parseStatKey(stat.Name) - if !ok { - v.logger.Warnf("Skipping unrecognized stat key: %s", stat.Name) - continue - } - - if _, err := insertConnStmt.Exec(connType, connName); err != nil { - v.logger.Errorf("Failed to insert conn: %v", err) - continue - } - - var connID int - err = selectConnIDStmt.QueryRow(connType, connName).Scan(&connID) - if err != nil { - v.logger.Errorf("Failed to retrieve conn_id: %v", err) - continue - } - - timeNow := time.Now().Unix() - if _, err := insertStatsStmt.Exec(connID, timeNow, stat.Value, direction); err != nil { - v.logger.Errorf("Failed to insert stats: %v", err) - continue - } - - v.logger.Infof("Inserted stats: conn_id=%d, timestamp=%d, traffic=%d, direction=%d", connID, timeNow, stat.Value, direction) - } - - if err := tx.Commit(); err != nil { - v.logger.Errorf("Failed to commit transaction: %v", err) - return err - } - return nil -} - -func parseStatKey(key string) (connType ConnectionType, connName string, direction TrafficDirection, ok bool) { - parts := strings.Split(key, ">>>") - if len(parts) != 4 || parts[2] != "traffic" { - return 0, "", 0, false - } - - switch parts[0] { - case "user": - connType = ConnTypeUser - case "inbound": - connType = ConnTypeInbound - case "outbound": - connType = ConnTypeOutbound - default: - return 0, "", 0, false - } - - connName = parts[1] - - switch parts[3] { - case "downlink": - direction = DirectionDownlink - case "uplink": - direction = DirectionUplink - default: - return 0, "", 0, false - } - - return connType, connName, direction, true -} - -func (v *V2Stat) QueryConn() ([]ConnInfo, error) { - rows, err := v.db.Query("SELECT type, name FROM conn") - if err != nil { - return nil, err - } - defer rows.Close() - - var conns []ConnInfo - for rows.Next() { - var conn ConnInfo - if err := rows.Scan(&conn.Type, &conn.Name); err != nil { - return nil, err - } - conns = append(conns, conn) - } - return conns, nil -} - -func (v *V2Stat) QueryStatsHourly(conn *ConnInfo) ([]TrafficStat, error) { - rows, err := v.db.Query(` - SELECT - strftime('%Y-%m-%d %H:00:00', datetime(s.timestamp, 'unixepoch', '+8 hours')) AS time, - SUM(CASE WHEN s.direction = 0 THEN s.traffic ELSE 0 END) AS downlink, - SUM(CASE WHEN s.direction = 1 THEN s.traffic ELSE 0 END) AS uplink - FROM stats s - JOIN conn c ON s.conn_id = c.id - WHERE c.type = ? AND c.name = ? - GROUP BY time - ORDER BY time; -`, conn.Type, conn.Name) - if err != nil { - return nil, err - } - defer rows.Close() - - var stats []TrafficStat - for rows.Next() { - var s TrafficStat - if err := rows.Scan(&s.Time, &s.Downlink, &s.Uplink); err != nil { - panic(err) - } - stats = append(stats, s) - } - - if err := rows.Err(); err != nil { - return nil, err - } - return stats, nil -} diff --git a/def.go b/def.go new file mode 100644 index 0000000..e9951ad --- /dev/null +++ b/def.go @@ -0,0 +1,66 @@ +package v2stat + +import ( + "strings" +) + +type TrafficDirection int + +const ( + DirectionDownlink TrafficDirection = iota + DirectionUplink +) + +type ConnectionType int + +const ( + ConnTypeUser ConnectionType = iota + ConnTypeInbound + ConnTypeOutbound +) + +type ConnInfo struct { + Type ConnectionType `json:"type"` + Name string `json:"name"` +} + +type TrafficStat struct { + Time string `json:"time"` + Downlink int64 `json:"downlink"` + Uplink int64 `json:"uplink"` +} + +func (ci *ConnInfo) String() string { + switch ci.Type { + case ConnTypeUser: + return "user:" + ci.Name + case ConnTypeInbound: + return "inbound:" + ci.Name + case ConnTypeOutbound: + return "outbound:" + ci.Name + default: + return "unknown:" + ci.Name + } +} + +func ParseConnInfo(s string) (ConnInfo, bool) { + parts := strings.Split(s, ":") + if len(parts) != 2 { + return ConnInfo{}, false + } + var connType ConnectionType + switch parts[0] { + case "user": + connType = ConnTypeUser + case "inbound": + connType = ConnTypeInbound + case "outbound": + connType = ConnTypeOutbound + default: + return ConnInfo{}, false + } + return ConnInfo{ + Type: connType, + Name: parts[1], + }, true +} \ No newline at end of file diff --git a/main.go b/main.go deleted file mode 100644 index 7451840..0000000 --- a/main.go +++ /dev/null @@ -1,207 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "flag" - "fmt" - "os" - "os/signal" - "syscall" - "time" - - "github.com/jedib0t/go-pretty/table" - "github.com/jedib0t/go-pretty/text" - _ "github.com/mattn/go-sqlite3" - "github.com/sirupsen/logrus" - "google.golang.org/grpc" - - "go.rikki.moe/v2stat/command" -) - -var ( - flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats") - flagDatabase = flag.String("db", "", "Path to SQLite database") - flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address") - flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)") -) - -var DefaultDBPaths = []string{ - "v2stat.db", - "traffic.db", - "/var/lib/v2stat/traffic.db", - "/usr/local/share/v2stat/traffic.db", - "/opt/apps/v2stat/traffic.db", -} - -type V2Stat struct { - logger *logrus.Logger - db *sql.DB - stat command.StatsServiceClient -} - -func main() { - flag.Parse() - args := flag.Args() - - if len(args) < 1 { - fmt.Println("Usage: v2stat [args]") - fmt.Println("Available commands: daemon, query") - os.Exit(1) - } - - logger := setupLogger(*flagLogLevel) - db := setupDatabase(logger, *flagDatabase) - defer db.Close() - - v2stat := &V2Stat{ - logger: logger, - db: db, - } - - switch args[0] { - case "daemon": - conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure()) - if err != nil { - logger.Fatalf("Failed to dial gRPC server: %v", err) - } - defer conn.Close() - v2stat.stat = command.NewStatsServiceClient(conn) - runServer(v2stat) - - case "query": - handleQuery(v2stat, args[1:]) - - default: - fmt.Println("Unknown command:", args[0]) - fmt.Println("Available commands: daemon, query") - os.Exit(1) - } -} - -func setupLogger(levelStr string) *logrus.Logger { - level, err := logrus.ParseLevel(levelStr) - if err != nil { - logrus.Fatalf("Invalid log level: %v", err) - } - logger := logrus.New() - logger.SetLevel(level) - return logger -} - -func setupDatabase(logger *logrus.Logger, dbpath string) *sql.DB { - if dbpath == "" { - for _, path := range DefaultDBPaths { - if _, err := os.Stat(path); err == nil { - dbpath = path - break - } - } - if dbpath == "" { - logger.Fatal("No database path provided and no default database found.") - } - } - logger.Infof("Using database: %s", dbpath) - - db, err := sql.Open("sqlite3", dbpath) - if err != nil { - logger.Fatalf("Failed to open database: %v", err) - } - return db -} - -func runServer(v2stat *V2Stat) { - logger := v2stat.logger - - if err := v2stat.InitDB(); err != nil { - logger.Fatalf("Failed to initialize database: %v", err) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - defer signal.Stop(sigCh) - - ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second) - defer ticker.Stop() - - for { - logger.Info("Recording stats...") - if err := v2stat.RecordNow(ctx); err != nil { - logger.Errorf("Failed to record stats: %v", err) - } - - select { - case <-ticker.C: - continue - case <-sigCh: - logger.Info("Received shutdown signal, exiting.") - return - case <-ctx.Done(): - logger.Info("Context canceled, exiting.") - return - } - } -} - -func handleQuery(v2stat *V2Stat, args []string) { - if len(args) == 0 { - fmt.Println("Usage: v2stat query ") - fmt.Println("Available connections:") - conns, err := v2stat.QueryConn() - if err != nil { - v2stat.logger.Fatalf("Failed to query connection: %v", err) - } - for _, c := range conns { - fmt.Printf("\t%s\n", c.String()) - } - return - } - - connStr := args[0] - conn, ok := ParseConnInfo(connStr) - if !ok { - v2stat.logger.Fatalf("Invalid connection format: %s", connStr) - } - - stats, err := v2stat.QueryStatsHourly(&conn) - if err != nil { - v2stat.logger.Fatalf("Failed to query stats: %v", err) - } - - printStatsTable(stats) -} - -func printStatsTable(stats []TrafficStat) { - tb := table.NewWriter() - tb.SetOutputMirror(os.Stdout) - tb.AppendHeader(table.Row{"Time", "Downlink", "Uplink"}) - - var totalDown, totalUp int64 - - for _, stat := range stats { - totalDown += stat.Downlink - totalUp += stat.Uplink - tb.AppendRow(table.Row{stat.Time, sizeToHuman(stat.Downlink), sizeToHuman(stat.Uplink)}) - } - - tb.AppendFooter(table.Row{"Total", sizeToHuman(totalDown), sizeToHuman(totalUp)}) - style := table.StyleLight - style.Format.Footer = text.FormatDefault - tb.SetStyle(style) - tb.Render() -} - -func sizeToHuman(size int64) string { - units := []string{"B", "KiB", "MiB", "GiB", "TiB"} - value := float64(size) - for _, unit := range units { - if value < 1024 { - return fmt.Sprintf("%7.2f %s", value, unit) - } - value /= 1024 - } - return fmt.Sprintf("%7.2f %s", value, "PiB") -} \ No newline at end of file diff --git a/v2stat.go b/v2stat.go new file mode 100644 index 0000000..49b37a8 --- /dev/null +++ b/v2stat.go @@ -0,0 +1,234 @@ +package v2stat + +import ( + "context" + "database/sql" + "strings" + "time" + + "github.com/sirupsen/logrus" + "go.rikki.moe/v2stat/command" + "google.golang.org/grpc" +) + +type V2Stat struct { + logger *logrus.Logger + db *sql.DB + conn *grpc.ClientConn + stat command.StatsServiceClient +} + +func NewV2Stat(logger *logrus.Logger, db *sql.DB, conn *grpc.ClientConn) *V2Stat { + return &V2Stat{ + logger: logger, + db: db, + conn: conn, + stat: command.NewStatsServiceClient(conn), + } +} + +func (v *V2Stat) Close() { + if v.db != nil { + v.db.Close() + } + if v.conn != nil { + v.conn.Close() + } +} + +func (v *V2Stat) SetConn(conn *grpc.ClientConn) { + if v.conn != nil { + v.conn.Close() + } + v.conn = conn + v.stat = command.NewStatsServiceClient(conn) +} + + +func (v *V2Stat) InitDB() error { + stmts := []string{ + `CREATE TABLE IF NOT EXISTS conn ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + type INTEGER NOT NULL, + name TEXT NOT NULL, + UNIQUE (type, name) + );`, + `CREATE TABLE IF NOT EXISTS stats ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conn_id INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + traffic INTEGER NOT NULL, + direction INTEGER NOT NULL, + FOREIGN KEY (conn_id) REFERENCES conn (id) + ON DELETE CASCADE + ON UPDATE CASCADE + );`, + `CREATE INDEX IF NOT EXISTS idx_conn_id ON stats (conn_id);`, + `CREATE INDEX IF NOT EXISTS idx_timestamp ON stats (timestamp);`, + } + + for _, stmt := range stmts { + if _, err := v.db.Exec(stmt); err != nil { + return err + } + } + return nil +} + +func (v *V2Stat) RecordNow(ctx context.Context) error { + resp, err := v.stat.QueryStats(ctx, &command.QueryStatsRequest{ + Reset_: true, + }) + if err != nil { + v.logger.Errorf("Failed to query stats: %v", err) + return err + } + + tx, err := v.db.Begin() + if err != nil { + v.logger.Errorf("Failed to begin transaction: %v", err) + return err + } + defer tx.Rollback() // safe to call even if already committed + + insertConnStmt, err := tx.Prepare(`INSERT OR IGNORE INTO conn (type, name) VALUES (?, ?)`) + if err != nil { + v.logger.Errorf("Failed to prepare conn insert statement: %v", err) + return err + } + defer insertConnStmt.Close() + + selectConnIDStmt, err := tx.Prepare(`SELECT id FROM conn WHERE type = ? AND name = ?`) + if err != nil { + v.logger.Errorf("Failed to prepare conn select statement: %v", err) + return err + } + defer selectConnIDStmt.Close() + + insertStatsStmt, err := tx.Prepare(` + INSERT INTO stats (conn_id, timestamp, traffic, direction) + VALUES (?, ?, ?, ?) + `) + if err != nil { + v.logger.Errorf("Failed to prepare stats insert statement: %v", err) + return err + } + defer insertStatsStmt.Close() + + for _, stat := range resp.Stat { + connType, connName, direction, ok := parseStatKey(stat.Name) + if !ok { + v.logger.Warnf("Skipping unrecognized stat key: %s", stat.Name) + continue + } + + if _, err := insertConnStmt.Exec(connType, connName); err != nil { + v.logger.Errorf("Failed to insert conn: %v", err) + continue + } + + var connID int + err = selectConnIDStmt.QueryRow(connType, connName).Scan(&connID) + if err != nil { + v.logger.Errorf("Failed to retrieve conn_id: %v", err) + continue + } + + timeNow := time.Now().Unix() + if _, err := insertStatsStmt.Exec(connID, timeNow, stat.Value, direction); err != nil { + v.logger.Errorf("Failed to insert stats: %v", err) + continue + } + + v.logger.Infof("Inserted stats: conn_id=%d, timestamp=%d, traffic=%d, direction=%d", connID, timeNow, stat.Value, direction) + } + + if err := tx.Commit(); err != nil { + v.logger.Errorf("Failed to commit transaction: %v", err) + return err + } + return nil +} + +func parseStatKey(key string) (connType ConnectionType, connName string, direction TrafficDirection, ok bool) { + parts := strings.Split(key, ">>>") + if len(parts) != 4 || parts[2] != "traffic" { + return 0, "", 0, false + } + + switch parts[0] { + case "user": + connType = ConnTypeUser + case "inbound": + connType = ConnTypeInbound + case "outbound": + connType = ConnTypeOutbound + default: + return 0, "", 0, false + } + + connName = parts[1] + + switch parts[3] { + case "downlink": + direction = DirectionDownlink + case "uplink": + direction = DirectionUplink + default: + return 0, "", 0, false + } + + return connType, connName, direction, true +} + +func (v *V2Stat) QueryConn() ([]ConnInfo, error) { + rows, err := v.db.Query("SELECT type, name FROM conn") + if err != nil { + return nil, err + } + defer rows.Close() + + var conns []ConnInfo + for rows.Next() { + var conn ConnInfo + if err := rows.Scan(&conn.Type, &conn.Name); err != nil { + return nil, err + } + conns = append(conns, conn) + } + return conns, nil +} + +func (v *V2Stat) QueryStatsHourly(conn *ConnInfo) ([]TrafficStat, error) { + rows, err := v.db.Query(` + SELECT + strftime('%Y-%m-%d %H:00:00', datetime(s.timestamp, 'unixepoch', '+8 hours')) AS time, + SUM(CASE WHEN s.direction = 0 THEN s.traffic ELSE 0 END) AS downlink, + SUM(CASE WHEN s.direction = 1 THEN s.traffic ELSE 0 END) AS uplink + FROM stats s + JOIN conn c ON s.conn_id = c.id + WHERE c.type = ? AND c.name = ? + GROUP BY time + ORDER BY time; +`, conn.Type, conn.Name) + if err != nil { + return nil, err + } + defer rows.Close() + + var stats []TrafficStat + for rows.Next() { + var s TrafficStat + if err := rows.Scan(&s.Time, &s.Downlink, &s.Uplink); err != nil { + panic(err) + } + stats = append(stats, s) + } + + if err := rows.Err(); err != nil { + return nil, err + } + return stats, nil +} + + -- cgit v1.2.3