WebSockets
> Real-time bidirectional communication — typed message handlers, groups, broadcasts, and per-client metadata.
The websocket package provides a typed WebSocket server with
message routing, group membership for fan-out, per-client metadata,
authentication hooks, and built-in rate limiting.
Import path: github.com/velocitykode/velocity/websocket
Quick start
import "github.com/velocitykode/velocity/websocket"
cfg := websocket.DefaultConfig()
cfg.Host = "0.0.0.0"
cfg.Port = 6001
cfg.Path = "/ws"
srv := websocket.New(cfg)
srv.OnConnect(func(c *websocket.Client) {
c.SendJSON("welcome", map[string]any{
"client_id": c.ID,
"message": "connected",
})
})
srv.On("chat", func(c *websocket.Client, msg websocket.Message) error {
return srv.Broadcast(websocket.Message{
Type: "chat",
From: c.ID,
Data: msg.Data,
})
})
if err := srv.Start(); err != nil {
log.Fatal(err)
}Start launches the internal dispatcher goroutine. To accept
connections, mount srv.HandleConnection on an HTTP route — see
Mounting below.
Configuration
DefaultConfig() provides reasonable defaults. Override what you
need:
cfg := websocket.Config{
Host: "0.0.0.0",
Port: 6001,
Path: "/ws",
AllowedOrigins: []string{"https://example.com"},
MaxConnections: 10000,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
MaxMessageSize: 512 * 1024, // 512KB
PingInterval: 30 * time.Second,
PongTimeout: 60 * time.Second,
WriteTimeout: 10 * time.Second,
MessageRateLimit: 100, // msgs/sec per client (0 = unlimited)
MessageBurstSize: 200,
AuthFunc: func(r *http.Request) error {
if r.URL.Query().Get("token") == "" {
return errors.New("missing token")
}
return nil
},
}AuthFunc runs before the WebSocket upgrade — return a non-nil
error to reject the handshake.
Mounting on an HTTP route
HandleConnection upgrades the connection and registers the client.
Wire it into your Velocity routes:
v.Routes(func(r *velocity.Routing) {
r.Web(func(web router.Router) {
web.Get("/ws", func(c *router.Context) error {
srv.HandleConnection(c.Response, c.Request)
return nil
})
})
})Or attach to a stdlib mux:
http.HandleFunc("/ws", srv.HandleConnection)Messages
A message is a JSON envelope:
type Message struct {
Type string `json:"type"` // routing key
Data any `json:"data"` // payload
Target string `json:"target,omitempty"` // client ID for direct sends
From string `json:"from,omitempty"` // sender client ID
}Routing incoming messages
Register handlers per message type:
srv.On("chat", func(c *websocket.Client, msg websocket.Message) error {
text, _ := msg.Data.(map[string]any)["text"].(string)
return srv.Broadcast(websocket.Message{
Type: "chat",
From: c.ID,
Data: map[string]any{"text": text},
})
})
srv.On("join", func(c *websocket.Client, msg websocket.Message) error {
room, _ := msg.Data.(map[string]any)["room"].(string)
return srv.JoinGroup(c.ID, room)
})Unknown message types are silently dropped — register handlers for everything you expect to receive.
Sending to one client
client.SendJSON("notification", map[string]any{
"title": "New message",
"body": "Alice sent you a DM",
})
// Or build the Message yourself:
client.SendMessage(websocket.Message{
Type: "ping",
Data: time.Now().Unix(),
})SendJSON is non-blocking — it queues the message on the client’s
send channel.
Broadcast to everyone
srv.Broadcast(websocket.Message{
Type: "announce",
Data: "Server restart in 5 minutes",
})Send to a specific client by ID
if err := srv.SendToClient("client-42", msg); err != nil {
// client not found
}Groups (rooms)
Groups are server-side bags of clients. Join, leave, and broadcast into them.
Membership
srv.JoinGroup(client.ID, "room-1")
srv.LeaveGroup(client.ID, "room-1")
srv.LeaveAllGroups(client.ID) // typically called on disconnect
if client.IsInGroup("room-1") {
// ...
}Broadcasting
// Send to every client in the group
srv.BroadcastToGroup("room-1", websocket.Message{
Type: "chat",
Data: map[string]any{"text": "Hello room"},
})
// Same as Broadcast but with explicit semantics
srv.SendToGroup("room-1", msg)
// Skip the sender
srv.SendToOthersInGroup("room-1", senderID, msg)Inspecting groups
clients := srv.GetGroupMembers("room-1") // []*Client
ids := srv.GetGroupMemberIDs("room-1")
groups := srv.GetGroups() // all group names
n := srv.GetGroupCount() // number of groups
empty := srv.IsGroupEmpty("room-1")Lifecycle callbacks
srv.OnConnect(func(c *websocket.Client) {
log.Info("connected", "id", c.ID)
})
srv.OnDisconnect(func(c *websocket.Client) {
srv.LeaveAllGroups(c.ID)
log.Info("disconnected", "id", c.ID)
})
srv.OnError(func(c *websocket.Client, err error) {
log.Error("client error", "id", c.ID, "err", err)
})Per-client metadata
Stash request-scoped data on the client itself:
srv.OnConnect(func(c *websocket.Client) {
user, err := authenticateFromQuery(c.Conn)
if err != nil {
c.Close()
return
}
c.SetMetadata("user_id", user.ID)
c.SetMetadata("plan", user.Plan)
})
srv.On("admin:purge", func(c *websocket.Client, msg websocket.Message) error {
plan, ok := c.GetMetadata("plan")
if !ok || plan != "admin" {
return c.SendJSON("error", map[string]any{"reason": "forbidden"})
}
return runPurge()
})Middleware
Wrap message handlers with cross-cutting concerns:
logging := func(next websocket.MessageHandler) websocket.MessageHandler {
return func(c *websocket.Client, msg websocket.Message) error {
log.Info("ws.recv", "client", c.ID, "type", msg.Type)
err := next(c, msg)
if err != nil {
log.Warn("ws.handler.err", "client", c.ID, "err", err)
}
return err
}
}
srv.Use(logging)Middleware runs in registration order around every dispatched message.
Authentication
Two layers:
Pre-upgrade —
Config.AuthFunc(*http.Request) error. Reject the WebSocket handshake before the connection is established. Read tokens from headers, query strings, or cookies.Post-upgrade — inside
OnConnector a message handler. Stash the user viaSetMetadata; gate handlers by reading the metadata.
cfg.AuthFunc = func(r *http.Request) error {
token := r.Header.Get("Authorization")
if !validateToken(token) {
return errors.New("invalid token")
}
return nil
}Pre-upgrade auth is preferred — rejected requests never consume a client slot.
Rate limiting
Built into the config — no external code needed:
cfg.MessageRateLimit = 50 // msgs/sec
cfg.MessageBurstSize = 100When a client exceeds the burst, the server closes their connection.
Set MessageRateLimit = 0 to disable.
Stats
stats := srv.GetStats()
log.Info("ws stats",
"clients", stats.ConnectedClients,
"sent", stats.MessagesSent,
"recv", stats.MessagesReceived,
"bytes_in", stats.BytesReceived,
"bytes_out", stats.BytesSent,
)Read-only snapshot. Useful for /health endpoints and Prometheus
exporters.
Inspecting clients
client, ok := srv.GetClient("client-42")
if ok {
client.SendJSON("ping", nil)
}
all := srv.GetClients() // map[string]*Client — copy of the live set
log.Info("connected", "n", len(all))Frontend (browser) example
const ws = new WebSocket('wss://example.com/ws?token=' + token);
ws.onopen = () => {
ws.send(JSON.stringify({ type: 'join', data: { room: 'lobby' } }));
};
ws.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
switch (msg.type) {
case 'welcome':
console.log('connected as', msg.data.client_id);
break;
case 'chat':
renderChat(msg.from, msg.data.text);
break;
}
};
function send(type, data) {
ws.send(JSON.stringify({ type, data }));
}The wire shape matches the Go Message struct directly — type,
data, optional target and from.
Testing
Spin up a server on port 0 (OS-assigned) and connect with the gorilla-websocket dialer:
import gws "github.com/gorilla/websocket"
func TestEcho(t *testing.T) {
cfg := websocket.DefaultConfig()
cfg.Port = 0
cfg.Path = "/ws"
srv := websocket.New(cfg)
received := make(chan websocket.Message, 1)
srv.On("echo", func(c *websocket.Client, m websocket.Message) error {
received <- m
return c.SendMessage(m)
})
if err := srv.Start(); err != nil {
t.Fatal(err)
}
defer srv.Stop()
ts := httptest.NewServer(http.HandlerFunc(srv.HandleConnection))
defer ts.Close()
url := "ws" + strings.TrimPrefix(ts.URL, "http") + "/ws"
conn, _, err := gws.DefaultDialer.Dial(url, nil)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
if err := conn.WriteJSON(websocket.Message{Type: "echo", Data: "hi"}); err != nil {
t.Fatal(err)
}
select {
case got := <-received:
if got.Type != "echo" || got.Data != "hi" {
t.Errorf("got %+v", got)
}
case <-time.After(time.Second):
t.Fatal("timeout")
}
}Design notes
- Groups, not rooms. Every set-of-clients abstraction is a group.
Use any naming convention you like (
room:lobby,tenant:42,private-user.42) — they’re just strings. - One dispatcher goroutine. The server runs a single goroutine fanning out broadcast/register/unregister channel events. Per-client read and write pumps run independently.
- Channel-based send.
SendJSONandSendMessageenqueue on the client’s bounded send channel — they’re safe to call from any goroutine and don’t block on slow consumers (full channels drop the client).