Command Bus
> Type-safe command dispatch with middleware, self-handling commands, and async queue delivery.
The bus package implements a typed command bus. Commands are plain
structs; handlers are generic functions; middleware composes through
the pipeline package. Commands can be dispatched synchronously or
pushed to the queue.
Import path: github.com/velocitykode/velocity/bus
Commands and handlers
A command is any struct. A handler is a function with a single typed argument:
type CreateInvoice struct {
UserID int
Amount int
}
func handleCreateInvoice(cmd CreateInvoice) error {
// ...
return nil
}Register the handler against the command type:
b := bus.New()
bus.Register(b, handleCreateInvoice)Register is a package-level function (not a method) because Go
generics don’t allow type parameters on methods.
Dispatching
err := b.Dispatch(CreateInvoice{UserID: 1, Amount: 5000})Dispatch looks up the registered handler, runs any middleware, and
invokes the handler. If no handler is registered it returns an error.
Self-handling commands
A command can carry its own handler by implementing SelfHandling:
type SendReminder struct { UserID int }
func (c SendReminder) Handle() error {
// send the reminder
return nil
}
// No Register() call needed — the bus falls back to the Handle method.
b.Dispatch(SendReminder{UserID: 42})Middleware
Middleware is a pipeline.Stage[Command] — use bus.Middleware to
build one from a function:
logging := bus.Middleware(func(cmd bus.Command, next func(bus.Command) error) error {
start := time.Now()
err := next(cmd)
log.Info("command", "type", fmt.Sprintf("%T", cmd), "dur", time.Since(start), "err", err)
return err
})
b.Through(logging, anotherMiddleware)Stages run in order for each dispatch, wrapping the handler in a pipeline.
bus.LoggingMiddleware(logger) is a ready-to-use logger stage.
Async dispatch
Push commands to the queue to run later in a worker:
b.SetQueue(v.Queue) // anything implementing QueuePusher
b.SetQueueName("notifications") // optional — otherwise default queue
b.DispatchAsync(SendReminder{UserID: 42})The command is wrapped as a queue job; when the worker picks it up it
calls Dispatch internally so the same handler + middleware chain
runs.
Events
Every dispatch emits lifecycle events:
*bus.CommandDispatching— before the handler runs*bus.CommandCompleted— on success*bus.CommandFailed— on handler error*bus.CommandQueued— onDispatchAsync
Wire the dispatcher:
b.SetEventDispatcher(v.Events.Dispatch)Testing
FakeBus records dispatched commands without running handlers — handy
for isolating handler tests:
fake := bus.NewFakeBus()
svc := NewOrderService(fake)
svc.Checkout(ctx, order)
if err := fake.AssertDispatched(SendReceipt{}); err != nil {
t.Error(err)
}Assertions available on *FakeBus:
AssertDispatched(cmd)— at least one command of that type was dispatchedAssertDispatchedTimes(cmd, n)— exactly n timesAssertNotDispatched(cmd)— zero timesAssertNothingDispatched()— no commands at allAssertAsyncDispatched(cmd)— dispatched viaDispatchAsyncGetDispatched()/ClearDispatched()— inspect or reset the log