diff options
| author | Rikki <i@rikki.moe> | 2025-04-13 20:47:02 +0800 |
|---|---|---|
| committer | Rikki <i@rikki.moe> | 2025-04-13 20:47:02 +0800 |
| commit | fb886134a635a632f128a48e891631de566b6baa (patch) | |
| tree | 403e44f980b0df2521f20a61e4db271311c406ba | |
| parent | 5f71e854fa722f79f031ab12248f15240d515caa (diff) | |
support query
| -rw-r--r-- | db.go | 108 | ||||
| -rw-r--r-- | go.mod | 10 | ||||
| -rw-r--r-- | go.sum | 18 | ||||
| -rw-r--r-- | main.go | 285 |
4 files changed, 323 insertions, 98 deletions
@@ -8,22 +8,68 @@ import ( "go.rikki.moe/v2stat/command" ) +type TrafficDirection int + const ( - DirectionDownlink = iota + DirectionDownlink TrafficDirection = iota DirectionUplink ) +type ConnectionType int + const ( - ConnTypeUser = iota + ConnTypeUser ConnectionType = iota ConnTypeInbound ConnTypeOutbound ) type ConnInfo struct { - Type int `json:"type"` - Name string `json:"name"` + Type ConnectionType `json:"type"` + Name string `json:"name"` +} + +type TrafficStat struct { + Time string `json:"time"` + Downlink int64 `json:"downlink"` + Uplink int64 `json:"uplink"` +} + +func (ci *ConnInfo) String() string { + switch ci.Type { + case ConnTypeUser: + return "user:" + ci.Name + case ConnTypeInbound: + return "inbound:" + ci.Name + case ConnTypeOutbound: + return "outbound:" + ci.Name + default: + return "unknown:" + ci.Name + } +} + +func ParseConnInfo(s string) (ConnInfo, bool) { + parts := strings.Split(s, ":") + if len(parts) != 2 { + return ConnInfo{}, false + } + var connType ConnectionType + switch parts[0] { + case "user": + connType = ConnTypeUser + case "inbound": + connType = ConnTypeInbound + case "outbound": + connType = ConnTypeOutbound + default: + return ConnInfo{}, false + } + return ConnInfo{ + Type: connType, + Name: parts[1], + }, true } + func (v *V2Stat) InitDB() error { stmts := []string{ `CREATE TABLE IF NOT EXISTS conn ( @@ -129,7 +175,7 @@ func (v *V2Stat) RecordNow(ctx context.Context) error { return nil } -func parseStatKey(key string) (connType int, connName string, direction int, ok bool) { +func parseStatKey(key string) (connType ConnectionType, connName string, direction TrafficDirection, ok bool) { parts := strings.Split(key, ">>>") if len(parts) != 4 || parts[2] != "traffic" { return 0, "", 0, false @@ -158,4 +204,54 @@ func parseStatKey(key string) (connType int, connName string, direction int, ok } return connType, connName, direction, true -}
\ No newline at end of file +} + +func (v *V2Stat) QueryConn() ([]ConnInfo, error) { + rows, err := v.db.Query("SELECT type, name FROM conn") + if err != nil { + return nil, err + } + defer rows.Close() + + var conns []ConnInfo + for rows.Next() { + var conn ConnInfo + if err := rows.Scan(&conn.Type, &conn.Name); err != nil { + return nil, err + } + conns = append(conns, conn) + } + return conns, nil +} + +func (v *V2Stat) QueryStatsHourly(conn *ConnInfo) ([]TrafficStat, error) { + rows, err := v.db.Query(` + SELECT + strftime('%Y-%m-%d %H:00:00', datetime(s.timestamp, 'unixepoch', '+8 hours')) AS time, + SUM(CASE WHEN s.direction = 0 THEN s.traffic ELSE 0 END) AS downlink, + SUM(CASE WHEN s.direction = 1 THEN s.traffic ELSE 0 END) AS uplink + FROM stats s + JOIN conn c ON s.conn_id = c.id + WHERE c.type = ? AND c.name = ? + GROUP BY time + ORDER BY time; +`, conn.Type, conn.Name) + if err != nil { + return nil, err + } + defer rows.Close() + + var stats []TrafficStat + for rows.Next() { + var s TrafficStat + if err := rows.Scan(&s.Time, &s.Downlink, &s.Uplink); err != nil { + panic(err) + } + stats = append(stats, s) + } + + if err := rows.Err(); err != nil { + return nil, err + } + return stats, nil +} @@ -7,21 +7,29 @@ require google.golang.org/grpc v1.71.1 require ( github.com/adrg/xdg v0.5.3 // indirect github.com/andybalholm/brotli v1.1.0 // indirect + github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect + github.com/go-openapi/errors v0.22.0 // indirect + github.com/go-openapi/strfmt v0.23.0 // indirect github.com/gofiber/fiber/v2 v2.52.6 // indirect github.com/gofiber/template v1.8.3 // indirect github.com/gofiber/template/html/v2 v2.1.3 // indirect github.com/gofiber/utils v1.1.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/jedib0t/go-pretty v4.3.0+incompatible // indirect + github.com/jedib0t/go-pretty/v6 v6.6.7 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mattn/go-sqlite3 v1.14.27 // indirect - github.com/rivo/uniseg v0.2.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/rivo/uniseg v0.4.7 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect + go.mongodb.org/mongo-driver v1.14.0 // indirect ) require ( @@ -2,8 +2,14 @@ github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78= github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= +github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-openapi/errors v0.22.0 h1:c4xY/OLxUBSTiepAg3j/MHuAv5mJhnf53LLMWFB+u/w= +github.com/go-openapi/errors v0.22.0/go.mod h1:J3DmZScxCDufmIMsdOuDHxJbdOGC0xtUynjIx092vXE= +github.com/go-openapi/strfmt v0.23.0 h1:nlUS6BCqcnAk0pyhi9Y+kdDVZdZMHfEKQiS4HaMgO/c= +github.com/go-openapi/strfmt v0.23.0/go.mod h1:NrtIpfKtWIygRkKVsxh7XQMDQW5HKQl6S5ik2elW+K4= github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI= github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/gofiber/template v1.8.3 h1:hzHdvMwMo/T2kouz2pPCA0zGiLCeMnoGsQZBTSYgZxc= @@ -16,6 +22,10 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jedib0t/go-pretty v4.3.0+incompatible h1:CGs8AVhEKg/n9YbUenWmNStRW2PHJzaeDodcfvRAbIo= +github.com/jedib0t/go-pretty v4.3.0+incompatible/go.mod h1:XemHduiw8R651AF9Pt4FwCTKeG3oo7hrHJAoznj9nag= +github.com/jedib0t/go-pretty/v6 v6.6.7 h1:m+LbHpm0aIAPLzLbMfn8dc3Ht8MW7lsSO4MPItz/Uuo= +github.com/jedib0t/go-pretty/v6 v6.6.7/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -27,9 +37,15 @@ github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6T github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.14.27 h1:drZCnuvf37yPfs95E5jd9s3XhdVWLal+6BOK6qrv6IU= github.com/mattn/go-sqlite3 v1.14.27/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -42,6 +58,8 @@ github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1S github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= +go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= @@ -1,104 +1,207 @@ package main import ( - "context" - "database/sql" - "flag" - "os" - "os/signal" - "time" - - _ "github.com/mattn/go-sqlite3" - "github.com/sirupsen/logrus" - "google.golang.org/grpc" - - "go.rikki.moe/v2stat/command" + "context" + "database/sql" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/jedib0t/go-pretty/table" + "github.com/jedib0t/go-pretty/text" + _ "github.com/mattn/go-sqlite3" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + + "go.rikki.moe/v2stat/command" ) var ( - flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats") - flagDatabase = flag.String("db", "v2stat.db", "Path to SQLite database") - flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address") - flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)") + flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats") + flagDatabase = flag.String("db", "", "Path to SQLite database") + flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address") + flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)") ) -// V2Stat holds references to the logger, database connection, and gRPC client. +var DefaultDBPaths = []string{ + "v2stat.db", + "traffic.db", + "/var/lib/v2stat/traffic.db", + "/usr/local/share/v2stat/traffic.db", + "/opt/apps/v2stat/traffic.db", +} + type V2Stat struct { - logger *logrus.Logger - db *sql.DB - stat command.StatsServiceClient + logger *logrus.Logger + db *sql.DB + stat command.StatsServiceClient } func main() { - flag.Parse() - - // Initialize logger - level, err := logrus.ParseLevel(*flagLogLevel) - if err != nil { - logrus.Fatalf("Invalid log level: %v", err) - } - logger := logrus.New() - logger.SetLevel(level) - - // Dial gRPC server - conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure()) - if err != nil { - logger.Fatalf("Failed to dial gRPC server: %v", err) - } - defer conn.Close() - - statClient := command.NewStatsServiceClient(conn) - - // Open SQLite database - db, err := sql.Open("sqlite3", *flagDatabase) - if err != nil { - logger.Fatalf("Failed to open database: %v", err) - } - defer db.Close() - - // Create main struct - v2stat := &V2Stat{ - logger: logger, - db: db, - stat: statClient, - } - - // Initialize database schema - if err := v2stat.InitDB(); err != nil { - logger.Fatalf("Failed to initialize database: %v", err) - } - - // For graceful shutdown, create a context that cancels on SIGINT/SIGTERM - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt, os.Kill) - - // Optional: Query stats once with a reset (as in your original code) - if _, err := v2stat.stat.QueryStats(ctx, &command.QueryStatsRequest{Reset_: true}); err != nil { - logger.Errorf("Failed to query stats: %v", err) - } - - ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second) - defer ticker.Stop() - // Main loop - for { - logger.Info("Recording stats...") - if err := v2stat.RecordNow(ctx); err != nil { - logger.Errorf("Failed to record stats: %v", err) - } - - // Wait for next ticker or shutdown signal - select { - case <-ticker.C: - // just continue the loop and record again - case <-sigCh: - logger.Info("Received shutdown signal, exiting.") - return - case <-ctx.Done(): - logger.Info("Context canceled, exiting.") - return - } - } + flag.Parse() + args := flag.Args() + + if len(args) < 1 { + fmt.Println("Usage: v2stat <command> [args]") + fmt.Println("Available commands: daemon, query") + os.Exit(1) + } + + logger := setupLogger(*flagLogLevel) + db := setupDatabase(logger, *flagDatabase) + defer db.Close() + + v2stat := &V2Stat{ + logger: logger, + db: db, + } + + switch args[0] { + case "daemon": + conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure()) + if err != nil { + logger.Fatalf("Failed to dial gRPC server: %v", err) + } + defer conn.Close() + v2stat.stat = command.NewStatsServiceClient(conn) + runServer(v2stat) + + case "query": + handleQuery(v2stat, args[1:]) + + default: + fmt.Println("Unknown command:", args[0]) + fmt.Println("Available commands: daemon, query") + os.Exit(1) + } +} + +func setupLogger(levelStr string) *logrus.Logger { + level, err := logrus.ParseLevel(levelStr) + if err != nil { + logrus.Fatalf("Invalid log level: %v", err) + } + logger := logrus.New() + logger.SetLevel(level) + return logger +} + +func setupDatabase(logger *logrus.Logger, dbpath string) *sql.DB { + if dbpath == "" { + for _, path := range DefaultDBPaths { + if _, err := os.Stat(path); err == nil { + dbpath = path + break + } + } + if dbpath == "" { + logger.Fatal("No database path provided and no default database found.") + } + } + logger.Infof("Using database: %s", dbpath) + + db, err := sql.Open("sqlite3", dbpath) + if err != nil { + logger.Fatalf("Failed to open database: %v", err) + } + return db +} + +func runServer(v2stat *V2Stat) { + logger := v2stat.logger + + if err := v2stat.InitDB(); err != nil { + logger.Fatalf("Failed to initialize database: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigCh) + + ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second) + defer ticker.Stop() + + for { + logger.Info("Recording stats...") + if err := v2stat.RecordNow(ctx); err != nil { + logger.Errorf("Failed to record stats: %v", err) + } + + select { + case <-ticker.C: + continue + case <-sigCh: + logger.Info("Received shutdown signal, exiting.") + return + case <-ctx.Done(): + logger.Info("Context canceled, exiting.") + return + } + } } + +func handleQuery(v2stat *V2Stat, args []string) { + if len(args) == 0 { + fmt.Println("Usage: v2stat query <connection_name>") + fmt.Println("Available connections:") + conns, err := v2stat.QueryConn() + if err != nil { + v2stat.logger.Fatalf("Failed to query connection: %v", err) + } + for _, c := range conns { + fmt.Printf("\t%s\n", c.String()) + } + return + } + + connStr := args[0] + conn, ok := ParseConnInfo(connStr) + if !ok { + v2stat.logger.Fatalf("Invalid connection format: %s", connStr) + } + + stats, err := v2stat.QueryStatsHourly(&conn) + if err != nil { + v2stat.logger.Fatalf("Failed to query stats: %v", err) + } + + printStatsTable(stats) +} + +func printStatsTable(stats []TrafficStat) { + tb := table.NewWriter() + tb.SetOutputMirror(os.Stdout) + tb.AppendHeader(table.Row{"Time", "Downlink", "Uplink"}) + + var totalDown, totalUp int64 + + for _, stat := range stats { + totalDown += stat.Downlink + totalUp += stat.Uplink + tb.AppendRow(table.Row{stat.Time, sizeToHuman(stat.Downlink), sizeToHuman(stat.Uplink)}) + } + + tb.AppendFooter(table.Row{"Total", sizeToHuman(totalDown), sizeToHuman(totalUp)}) + style := table.StyleLight + style.Format.Footer = text.FormatDefault + tb.SetStyle(style) + tb.Render() +} + +func sizeToHuman(size int64) string { + units := []string{"B", "KiB", "MiB", "GiB", "TiB"} + value := float64(size) + for _, unit := range units { + if value < 1024 { + return fmt.Sprintf("%7.2f %s", value, unit) + } + value /= 1024 + } + return fmt.Sprintf("%7.2f %s", value, "PiB") +}
\ No newline at end of file |
