diff options
| author | Rikki <i@rikki.moe> | 2025-05-12 15:50:39 +0800 |
|---|---|---|
| committer | Rikki <i@rikki.moe> | 2025-05-12 15:50:39 +0800 |
| commit | ce8dbb7686bd1d9729b0396d7557db8126fe5cae (patch) | |
| tree | d57872feae02e8b4e2c798580b2b015414b7f419 /cmd/v2stat | |
| parent | 01d8c1c87f545eb1c14221f7a3e5820fc5055496 (diff) | |
Diffstat (limited to 'cmd/v2stat')
| -rw-r--r-- | cmd/v2stat/daemon.go | 45 | ||||
| -rw-r--r-- | cmd/v2stat/main.go | 192 |
2 files changed, 73 insertions, 164 deletions
diff --git a/cmd/v2stat/daemon.go b/cmd/v2stat/daemon.go deleted file mode 100644 index 8c5e91f..0000000 --- a/cmd/v2stat/daemon.go +++ /dev/null @@ -1,45 +0,0 @@ -package main - -import ( - "context" - "os" - "os/signal" - "syscall" - "time" - - "go.rikki.moe/v2stat" -) - -func runDaemon(v2s *v2stat.V2Stat) { - if err := v2s.InitDB(); err != nil { - logger.Fatalf("Failed to initialize database: %v", err) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - defer signal.Stop(sigCh) - - ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second) - defer ticker.Stop() - - for { - logger.Info("Recording stats...") - if err := v2s.RecordNow(ctx); err != nil { - logger.Errorf("Failed to record stats: %v", err) - } - - select { - case <-ticker.C: - continue - case <-sigCh: - logger.Info("Received shutdown signal, exiting.") - return - case <-ctx.Done(): - logger.Info("Context canceled, exiting.") - return - } - } -} diff --git a/cmd/v2stat/main.go b/cmd/v2stat/main.go index ce0c3ed..962f54c 100644 --- a/cmd/v2stat/main.go +++ b/cmd/v2stat/main.go @@ -1,159 +1,113 @@ package main import ( - "database/sql" + "context" "flag" - "fmt" "os" + "os/signal" + "syscall" + "time" - "github.com/jedib0t/go-pretty/table" - "github.com/jedib0t/go-pretty/text" - _ "github.com/mattn/go-sqlite3" + "github.com/influxdata/influxdb-client-go/v2" "github.com/sirupsen/logrus" "google.golang.org/grpc" - "go.rikki.moe/v2stat" + "go.rikki.moe/v2stat/command" ) var ( - flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats") - flagDatabase = flag.String("db", "", "Path to SQLite database") - flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address") - flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)") + flagServerName = flag.String("name", "", "Name of the server") + flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats") + flagInflux = flag.String("influx", "", "URL to InfluxDB database") + flagInfluxToken = flag.String("token", "", "InfluxDB token") + flagOrg = flag.String("org", "", "InfluxDB organization") + flagBucket = flag.String("bucket", "", "InfluxDB bucket") + flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address") + flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)") ) -var DefaultDBPaths = []string{ - "v2stat.db", - "traffic.db", - "/var/lib/v2stat/traffic.db", - "/usr/local/share/v2stat/traffic.db", - "/opt/apps/v2stat/traffic.db", -} - var logger *logrus.Logger func main() { flag.Parse() - args := flag.Args() - - if len(args) < 1 { - fmt.Println("Usage: v2stat <command> [args]") - fmt.Println("Available commands: daemon, query") - os.Exit(1) - } logger = setupLogger(*flagLogLevel) - db := setupDatabase(logger, *flagDatabase) - - v2s := v2stat.NewV2Stat(logger, db, nil) - defer v2s.Close() - switch args[0] { - case "daemon": - conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure()) + // Use hostname as default server name if not provided + servername := *flagServerName + if servername == "" { + hostname, err := os.Hostname() if err != nil { - logger.Fatalf("Failed to dial gRPC server: %v", err) + logger.Fatalf("Failed to get hostname: %v", err) } - v2s.SetConn(conn) - runDaemon(v2s) - - case "query": - handleQuery(v2s, args[1:]) - - default: - fmt.Println("Unknown command:", args[0]) - fmt.Println("Available commands: daemon, query") - os.Exit(1) + servername = hostname } -} + logger.Infof("Using server name: %s", servername) -func setupLogger(levelStr string) *logrus.Logger { - level, err := logrus.ParseLevel(levelStr) + // Set up gRPC connection to V2Ray API server + conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure()) if err != nil { - logrus.Fatalf("Invalid log level: %v", err) + logger.Fatalf("Failed to connect to V2Ray API server: %v", err) } - logger := logrus.New() - logger.SetLevel(level) - return logger -} - -func setupDatabase(logger *logrus.Logger, dbpath string) *sql.DB { - if dbpath == "" { - for _, path := range DefaultDBPaths { - if _, err := os.Stat(path); err == nil { - dbpath = path - break - } - } - if dbpath == "" { - logger.Fatal("No database path provided and no default database found.") - } + defer conn.Close() + client := command.NewStatsServiceClient(conn) + if client == nil { + logger.Fatalf("Failed to create V2Ray API client") } - logger.Infof("Using database: %s", dbpath) - db, err := sql.Open("sqlite3", dbpath) - if err != nil { - logger.Fatalf("Failed to open database: %v", err) - } - return db -} + // Set up InfluxDB client + influxClient := influxdb2.NewClient(*flagInflux, *flagInfluxToken) + defer influxClient.Close() -func handleQuery(v2s *v2stat.V2Stat, args []string) { - if len(args) == 0 { - fmt.Println("Usage: v2stat query <connection_name>") - fmt.Println("Available connections:") - conns, err := v2s.QueryConn() - if err != nil { - logger.Fatalf("Failed to query connection: %v", err) - } - for _, c := range conns { - fmt.Printf("\t%s\n", c.String()) - } - return - } + // Create a new point batch + bp := influxClient.WriteAPIBlocking(*flagOrg, *flagBucket) - connStr := args[0] - conn, ok := v2stat.ParseConnInfo(connStr) - if !ok { - logger.Fatalf("Invalid connection format: %s", connStr) - } + ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second) + defer ticker.Stop() + killsig := make(chan os.Signal, 1) + signal.Notify(killsig, syscall.SIGINT, syscall.SIGTERM) - stats, err := v2s.QueryStatsHourly(&conn) - if err != nil { - logger.Fatalf("Failed to query stats: %v", err) - } - - printStatsTable(stats) -} - -func printStatsTable(stats []v2stat.TrafficStat) { - tb := table.NewWriter() - tb.SetOutputMirror(os.Stdout) - tb.AppendHeader(table.Row{"Time", "Downlink", "Uplink"}) + for { + now := time.Now() + stats, err := client.QueryStats(context.Background(), &command.QueryStatsRequest{ + Reset_: true, + }) + if err != nil { + logger.Errorf("Failed to get stats: %v", err) + goto LOOP_FINAL + } - var totalDown, totalUp int64 + // Write stats to InfluxDB + for _, stat := range stats.Stat { + point := influxdb2.NewPoint( + "v2ray_stats", + map[string]string{"server": servername, "stat": stat.Name}, + map[string]interface{}{"value": stat.Value}, + now, + ) + if err := bp.WritePoint(context.Background(), point); err != nil { + logger.Errorf("Failed to write point to InfluxDB: %v", err) + } + } - for _, stat := range stats { - totalDown += stat.Downlink - totalUp += stat.Uplink - tb.AppendRow(table.Row{stat.Time, sizeToHuman(stat.Downlink), sizeToHuman(stat.Uplink)}) +LOOP_FINAL: + select { + case <-ticker.C: + continue + case sig := <-killsig: + logger.Infof("Received signal: %s", sig) + return + } } - tb.AppendFooter(table.Row{"Total", sizeToHuman(totalDown), sizeToHuman(totalUp)}) - style := table.StyleLight - style.Format.Footer = text.FormatDefault - tb.SetStyle(style) - tb.Render() } -func sizeToHuman(size int64) string { - units := []string{"B", "KiB", "MiB", "GiB", "TiB"} - value := float64(size) - for _, unit := range units { - if value < 1024 { - return fmt.Sprintf("%7.2f %s", value, unit) - } - value /= 1024 +func setupLogger(levelStr string) *logrus.Logger { + level, err := logrus.ParseLevel(levelStr) + if err != nil { + logrus.Fatalf("Invalid log level: %v", err) } - return fmt.Sprintf("%7.2f %s", value, "PiB") + logger := logrus.New() + logger.SetLevel(level) + return logger } |
