Files
bluesky-nats-proxy/main.go
2025-03-09 23:50:28 -04:00

139 lines
4.1 KiB
Go

package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"runtime"
"time"
_ "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/jetstream/pkg/client"
"github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel"
"github.com/bluesky-social/jetstream/pkg/models"
"github.com/nats-io/nats.go"
)
// printHelp outputs the usage information.
func printHelp() {
fmt.Println("Usage: HTTP-to-NATS proxy server")
fmt.Println("\nThe following environment variables are supported:")
fmt.Println(" NATS_URL - NATS connection URL (default: nats://127.0.0.1:4222)")
fmt.Println(" NATS_USER - NATS username for authentication (optional)")
fmt.Println(" NATS_PASSWORD - NATS password for authentication (optional)")
fmt.Println(" NATS_TOKEN - NATS token for authentication (optional)")
fmt.Println(" NATS_NKEY - NATS NKEY for authentication (optional)")
fmt.Println(" NATS_NKEY_SEED - NATS NKEY seed for authentication (optional)")
fmt.Println(" NATS_CREDS_FILE - Path to NATS credentials file (optional)")
}
func main() {
helpFlag := flag.Bool("help", false, "Display help information about available environment variables")
flag.Parse()
if *helpFlag {
printHelp()
os.Exit(0)
}
// Read NATS connection info from environment variables
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL // defaults to "nats://127.0.0.1:4222"
}
natsUser := os.Getenv("NATS_USER")
natsPassword := os.Getenv("NATS_PASSWORD")
natsToken := os.Getenv("NATS_TOKEN")
natsNkey := os.Getenv("NATS_NKEY")
natsNkeySeed := os.Getenv("NATS_NKEY_SEED")
natsCredsFile := os.Getenv("NATS_CREDS_FILE")
// Set up NATS connection options
opts := []nats.Option{nats.Name("Web proxy")}
if natsUser != "" && natsPassword != "" {
opts = append(opts, nats.UserInfo(natsUser, natsPassword))
} else if natsToken != "" {
opts = append(opts, nats.Token(natsToken))
} else if natsNkey != "" && natsNkeySeed != "" {
log.Fatalln("NKEY connection not supported")
} else if natsCredsFile != "" {
opts = append(opts, nats.UserCredentials(natsCredsFile))
}
opts = append(opts, nats.Name("Bluesky Proxy"))
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
log.Fatal("Error connecting to NATS:", err)
}
defer nc.Close()
log.Println("Connected to NATS")
// Create Jetstream client with proper configuration
config := &client.ClientConfig{
WebsocketURL: "wss://jetstream1.us-east.bsky.network/subscribe",
}
logger := slog.Default()
scheduler := parallel.NewScheduler(runtime.NumCPU(), "nats-proxy", logger, func(ctx context.Context, e *models.Event) error {
if e != nil && e.Commit != nil && e.Commit.Collection != "" {
// Get the collection from the commit
collection := e.Commit.Collection
// Determine the subject based on the collection
subject := fmt.Sprintf("bluesky.%s", collection)
// Convert the event to JSON
jsonData, err := json.Marshal(e)
if err != nil {
log.Printf("Error marshaling event: %v", err)
return err
}
// Publish the event to NATS
err = nc.Publish(subject, jsonData)
if err != nil {
log.Printf("Error publishing to NATS: %v", err)
return err
}
}
return nil
})
log.Println("Created config, logger, scheduler")
go func() {
cursor := time.Now().UnixMicro()
totalServers := 2 // See: https://github.com/bluesky-social/jetstream?tab=readme-ov-file#public-instances
for i := 0; true; i++ {
jc, err := client.NewClient(config, logger, scheduler)
if err != nil {
log.Fatalf("Error creating Jetstream client: %v", err)
}
if err := jc.ConnectAndRead(context.Background(), &cursor); err != nil {
log.Printf("failed to connect: %v\n", err)
}
time.Sleep(1 * time.Second)
// alternate between available jetstream servers
config.WebsocketURL = fmt.Sprintf("wss://jetstream%d.us-east.bsky.network/subscribe", i%totalServers+1)
log.Printf("connecting to %s instead\n", config.WebsocketURL)
}
}()
// Wait for interrupt signal to gracefully shut down
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
log.Println("Shutting down...")
}