summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorRikki <i@rikki.moe>2025-05-12 15:50:39 +0800
committerRikki <i@rikki.moe>2025-05-12 15:50:39 +0800
commitce8dbb7686bd1d9729b0396d7557db8126fe5cae (patch)
treed57872feae02e8b4e2c798580b2b015414b7f419 /cmd
parent01d8c1c87f545eb1c14221f7a3e5820fc5055496 (diff)
use influx db insteadHEADmaster
Diffstat (limited to 'cmd')
-rw-r--r--cmd/v2stat/daemon.go45
-rw-r--r--cmd/v2stat/main.go192
2 files changed, 73 insertions, 164 deletions
diff --git a/cmd/v2stat/daemon.go b/cmd/v2stat/daemon.go
deleted file mode 100644
index 8c5e91f..0000000
--- a/cmd/v2stat/daemon.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package main
-
-import (
- "context"
- "os"
- "os/signal"
- "syscall"
- "time"
-
- "go.rikki.moe/v2stat"
-)
-
-func runDaemon(v2s *v2stat.V2Stat) {
- if err := v2s.InitDB(); err != nil {
- logger.Fatalf("Failed to initialize database: %v", err)
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- sigCh := make(chan os.Signal, 1)
- signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
- defer signal.Stop(sigCh)
-
- ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second)
- defer ticker.Stop()
-
- for {
- logger.Info("Recording stats...")
- if err := v2s.RecordNow(ctx); err != nil {
- logger.Errorf("Failed to record stats: %v", err)
- }
-
- select {
- case <-ticker.C:
- continue
- case <-sigCh:
- logger.Info("Received shutdown signal, exiting.")
- return
- case <-ctx.Done():
- logger.Info("Context canceled, exiting.")
- return
- }
- }
-}
diff --git a/cmd/v2stat/main.go b/cmd/v2stat/main.go
index ce0c3ed..962f54c 100644
--- a/cmd/v2stat/main.go
+++ b/cmd/v2stat/main.go
@@ -1,159 +1,113 @@
package main
import (
- "database/sql"
+ "context"
"flag"
- "fmt"
"os"
+ "os/signal"
+ "syscall"
+ "time"
- "github.com/jedib0t/go-pretty/table"
- "github.com/jedib0t/go-pretty/text"
- _ "github.com/mattn/go-sqlite3"
+ "github.com/influxdata/influxdb-client-go/v2"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
- "go.rikki.moe/v2stat"
+ "go.rikki.moe/v2stat/command"
)
var (
- flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats")
- flagDatabase = flag.String("db", "", "Path to SQLite database")
- flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address")
- flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)")
+ flagServerName = flag.String("name", "", "Name of the server")
+ flagInterval = flag.Int("interval", 300, "Interval in seconds to record stats")
+ flagInflux = flag.String("influx", "", "URL to InfluxDB database")
+ flagInfluxToken = flag.String("token", "", "InfluxDB token")
+ flagOrg = flag.String("org", "", "InfluxDB organization")
+ flagBucket = flag.String("bucket", "", "InfluxDB bucket")
+ flagServer = flag.String("server", "127.0.0.1:8080", "V2Ray API server address")
+ flagLogLevel = flag.String("log-level", "info", "Log level (debug, info, warn, error, fatal, panic)")
)
-var DefaultDBPaths = []string{
- "v2stat.db",
- "traffic.db",
- "/var/lib/v2stat/traffic.db",
- "/usr/local/share/v2stat/traffic.db",
- "/opt/apps/v2stat/traffic.db",
-}
-
var logger *logrus.Logger
func main() {
flag.Parse()
- args := flag.Args()
-
- if len(args) < 1 {
- fmt.Println("Usage: v2stat <command> [args]")
- fmt.Println("Available commands: daemon, query")
- os.Exit(1)
- }
logger = setupLogger(*flagLogLevel)
- db := setupDatabase(logger, *flagDatabase)
-
- v2s := v2stat.NewV2Stat(logger, db, nil)
- defer v2s.Close()
- switch args[0] {
- case "daemon":
- conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure())
+ // Use hostname as default server name if not provided
+ servername := *flagServerName
+ if servername == "" {
+ hostname, err := os.Hostname()
if err != nil {
- logger.Fatalf("Failed to dial gRPC server: %v", err)
+ logger.Fatalf("Failed to get hostname: %v", err)
}
- v2s.SetConn(conn)
- runDaemon(v2s)
-
- case "query":
- handleQuery(v2s, args[1:])
-
- default:
- fmt.Println("Unknown command:", args[0])
- fmt.Println("Available commands: daemon, query")
- os.Exit(1)
+ servername = hostname
}
-}
+ logger.Infof("Using server name: %s", servername)
-func setupLogger(levelStr string) *logrus.Logger {
- level, err := logrus.ParseLevel(levelStr)
+ // Set up gRPC connection to V2Ray API server
+ conn, err := grpc.NewClient(*flagServer, grpc.WithInsecure())
if err != nil {
- logrus.Fatalf("Invalid log level: %v", err)
+ logger.Fatalf("Failed to connect to V2Ray API server: %v", err)
}
- logger := logrus.New()
- logger.SetLevel(level)
- return logger
-}
-
-func setupDatabase(logger *logrus.Logger, dbpath string) *sql.DB {
- if dbpath == "" {
- for _, path := range DefaultDBPaths {
- if _, err := os.Stat(path); err == nil {
- dbpath = path
- break
- }
- }
- if dbpath == "" {
- logger.Fatal("No database path provided and no default database found.")
- }
+ defer conn.Close()
+ client := command.NewStatsServiceClient(conn)
+ if client == nil {
+ logger.Fatalf("Failed to create V2Ray API client")
}
- logger.Infof("Using database: %s", dbpath)
- db, err := sql.Open("sqlite3", dbpath)
- if err != nil {
- logger.Fatalf("Failed to open database: %v", err)
- }
- return db
-}
+ // Set up InfluxDB client
+ influxClient := influxdb2.NewClient(*flagInflux, *flagInfluxToken)
+ defer influxClient.Close()
-func handleQuery(v2s *v2stat.V2Stat, args []string) {
- if len(args) == 0 {
- fmt.Println("Usage: v2stat query <connection_name>")
- fmt.Println("Available connections:")
- conns, err := v2s.QueryConn()
- if err != nil {
- logger.Fatalf("Failed to query connection: %v", err)
- }
- for _, c := range conns {
- fmt.Printf("\t%s\n", c.String())
- }
- return
- }
+ // Create a new point batch
+ bp := influxClient.WriteAPIBlocking(*flagOrg, *flagBucket)
- connStr := args[0]
- conn, ok := v2stat.ParseConnInfo(connStr)
- if !ok {
- logger.Fatalf("Invalid connection format: %s", connStr)
- }
+ ticker := time.NewTicker(time.Duration(*flagInterval) * time.Second)
+ defer ticker.Stop()
+ killsig := make(chan os.Signal, 1)
+ signal.Notify(killsig, syscall.SIGINT, syscall.SIGTERM)
- stats, err := v2s.QueryStatsHourly(&conn)
- if err != nil {
- logger.Fatalf("Failed to query stats: %v", err)
- }
-
- printStatsTable(stats)
-}
-
-func printStatsTable(stats []v2stat.TrafficStat) {
- tb := table.NewWriter()
- tb.SetOutputMirror(os.Stdout)
- tb.AppendHeader(table.Row{"Time", "Downlink", "Uplink"})
+ for {
+ now := time.Now()
+ stats, err := client.QueryStats(context.Background(), &command.QueryStatsRequest{
+ Reset_: true,
+ })
+ if err != nil {
+ logger.Errorf("Failed to get stats: %v", err)
+ goto LOOP_FINAL
+ }
- var totalDown, totalUp int64
+ // Write stats to InfluxDB
+ for _, stat := range stats.Stat {
+ point := influxdb2.NewPoint(
+ "v2ray_stats",
+ map[string]string{"server": servername, "stat": stat.Name},
+ map[string]interface{}{"value": stat.Value},
+ now,
+ )
+ if err := bp.WritePoint(context.Background(), point); err != nil {
+ logger.Errorf("Failed to write point to InfluxDB: %v", err)
+ }
+ }
- for _, stat := range stats {
- totalDown += stat.Downlink
- totalUp += stat.Uplink
- tb.AppendRow(table.Row{stat.Time, sizeToHuman(stat.Downlink), sizeToHuman(stat.Uplink)})
+LOOP_FINAL:
+ select {
+ case <-ticker.C:
+ continue
+ case sig := <-killsig:
+ logger.Infof("Received signal: %s", sig)
+ return
+ }
}
- tb.AppendFooter(table.Row{"Total", sizeToHuman(totalDown), sizeToHuman(totalUp)})
- style := table.StyleLight
- style.Format.Footer = text.FormatDefault
- tb.SetStyle(style)
- tb.Render()
}
-func sizeToHuman(size int64) string {
- units := []string{"B", "KiB", "MiB", "GiB", "TiB"}
- value := float64(size)
- for _, unit := range units {
- if value < 1024 {
- return fmt.Sprintf("%7.2f %s", value, unit)
- }
- value /= 1024
+func setupLogger(levelStr string) *logrus.Logger {
+ level, err := logrus.ParseLevel(levelStr)
+ if err != nil {
+ logrus.Fatalf("Invalid log level: %v", err)
}
- return fmt.Sprintf("%7.2f %s", value, "PiB")
+ logger := logrus.New()
+ logger.SetLevel(level)
+ return logger
}