Events
> Build event-driven applications with Velocity's observer pattern for decoupled, extensible architecture.
Velocity provides a powerful event system that allows you to decouple various parts of your application using the observer pattern. Events enable clean, maintainable code by separating concerns and making your application more extensible.
Every callback in the events package takes context.Context as its first
positional argument. Migrate your code as follows:
Handle(event) -> Handle(ctx, event)
Dispatch(event) -> Dispatch(ctx, event)
Created(model) -> Created(ctx, model)
SetEventDispatcher(fn func(any) error) ->
SetEventDispatcher(fn func(context.Context, any) error)The same shape applies to DispatchNow, DispatchAsync, DispatchAfter,
Until, every ModelObserver lifecycle method, MiddlewareFunc,
HandleWithResult, HandleWithPropagation, AsyncDispatcher.Push,
TransactionalDispatcher.Commit / DispatchAfterCommit,
ObserverRegistry.Fire / FireModelEvent, ObservableModel.FireEvent,
and QueueDispatcher.Push. Subsystems that implement
contract.EventDispatcherAware plumb their own ctx into the bridge instead
of dropping to context.Background().
Quick Start
import "github.com/velocitykode/velocity/events"
// Define an event
type UserRegistered struct {
UserID int
Email string
}
func (e UserRegistered) Name() string {
return "user.registered"
}
// Dispatch the event from a handler. ctx.Events() returns the
// app-wide events.Dispatcher wired up at boot. Pass the request ctx
// so listeners observe deadlines, trace IDs, and tx scopes.
func (h *AuthHandler) Register(ctx *router.Context) error {
user := createUser(ctx.FormValue("email"), ctx.FormValue("password"))
return ctx.Events().Dispatch(ctx.Request.Context(), UserRegistered{
UserID: user.ID,
Email: user.Email,
})
}// Define a listener. Handle now takes ctx as the first argument.
type SendWelcomeEmail struct{}
func (l *SendWelcomeEmail) Handle(ctx context.Context, event interface{}) error {
e := event.(UserRegistered)
// Send welcome email; honour ctx.Done() if the sink supports it.
return sendEmail(ctx, e.Email, "Welcome to our platform!")
}
func (l *SendWelcomeEmail) ShouldQueue() bool {
return true // Process asynchronously
}
// Register the listener through the App.Events hook.
func (a *App) Events(d events.Dispatcher) {
d.Listen("user.registered", &SendWelcomeEmail{})
}// Create a logger that handles all user events.
type UserActivityLogger struct{}
func (l *UserActivityLogger) Handle(ctx context.Context, event interface{}) error {
log.InfoContext(ctx, "User event occurred", "event", event)
return nil
}
func (l *UserActivityLogger) ShouldQueue() bool {
return false
}
// Listen to wildcards via the App.Events hook
func (a *App) Events(d events.Dispatcher) {
// All user events
d.Listen("user.*", &UserActivityLogger{})
// All "created" events
d.Listen("*.created", &AuditLogger{})
// All events
d.Listen("*", &GlobalLogger{})
}// Dispatch events asynchronously from a handler
func (h *OrderHandler) Place(ctx *router.Context, order *Order) error {
rctx := ctx.Request.Context()
// Save order synchronously
if err := h.db.Create(rctx, order); err != nil {
return err
}
// Dispatch asynchronously (queue or panic-safe goroutine fallback).
// Async paths derive ctx via context.WithoutCancel: trace IDs flow
// through to listeners but cancellation/deadline are stripped.
if err := ctx.Events().DispatchAsync(rctx, OrderPlaced{
OrderID: order.ID,
Total: order.Total,
}); err != nil {
return err
}
// Or dispatch after a delay
return ctx.Events().DispatchAfter(
rctx,
OrderFollowUp{OrderID: order.ID},
24*time.Hour,
)
}Async ctx semantics
Async paths – Dispatcher.DispatchAsync, Dispatcher.DispatchAfter,
AsyncDispatcher.Push, and the BatchingDispatcher /
DebouncingDispatcher / CoalescingDispatcher.Dispatch wrappers – derive
the listener ctx from the caller via context.WithoutCancel. That means:
- Request-scoped values (trace IDs, tenant IDs, anything stored via
context.WithValue) propagate through to listeners. - The caller’s cancellation and deadline are stripped before dispatch, because the listener may legitimately outlive the caller (a 30s delayed listener kicked off from an HTTP handler keeps running long after the response flushed).
If you need cancellation to reach the listener, do not use the async
methods. Use Dispatch synchronously and let the listener spawn its own
goroutine that observes ctx.Done(), or push to a queue with the
cancellation contract you want.
The synchronous Dispatch / DispatchNow / Until paths pass ctx through
unchanged.
Transactional dispatch buffer
Domain events that announce state changes (OrderPlaced, UserRegistered, InvoicePaid) should fire if-and-only-if the database transaction that produced the change actually commits. Dispatching them inside the transaction risks listeners reacting to writes that get rolled back; dispatching them after Transaction returns leaks transaction boundaries into every caller. The events package and orm.Manager.Transaction cooperate to give you commit-only semantics with no callback signature changes.
How it fits together
Three pieces wire up the buffer:
events.PrepareBuffer(ctx)attaches a mutable holder toctx. The returnedctxis the one you must thread into the transaction.orm.Manager.Transaction(ctx, fn)installs a per-transaction*events.BufferedDispatcherinto that holder for the lifetime of the call. Anyevents.Buffer(ctx)lookup insidefn(or any descendant call that received the samectx) finds the buffer.- On successful
tx.Commit()the buffer is flushed; onfnreturning an error, the deferred panic recovery, or a commit failure, the buffer is dropped and no events fire.
Manager.SetTxEventBus(bus) wires the kind-aware sink the buffer drains into. The framework calls it during boot so DispatchAsync, DispatchAfter, and Until recorded inside the transaction route back through the matching ctx-aware method on the underlying dispatcher when the buffer flushes, so ShouldQueue, the recorded delay, and the until-first-non-nil contract all survive the buffer boundary. The flush uses the parent transaction ctx so trace IDs, deadlines, and request-scoped values propagate all the way through to listeners. Without a tx event bus, the buffer falls back to the legacy untyped sink set by SetEventDispatcher (now itself ctx-aware) and every kind collapses onto Dispatch.
Recipe: emit OrderPlaced only on tx commit
func (h *OrderHandler) Place(ctx *router.Context) error {
// Prepare the buffer holder on the request ctx ONCE, before
// entering the transaction. The returned ctx must be the one
// passed to Transaction.
reqCtx := events.PrepareBuffer(ctx.Request.Context())
return h.db.Transaction(reqCtx, func(tx *sql.Tx) error {
order, err := insertOrder(tx, ctx.FormValue("sku"))
if err != nil {
return err // buffer dropped, OrderPlaced never fires
}
// Recorded into the per-tx buffer; fires on commit. The ctx
// arg is captured at flush time, not at record time.
return events.Buffer(reqCtx).Dispatch(reqCtx, OrderPlaced{
OrderID: order.ID,
Total: order.Total,
})
})
}If insertOrder returns an error, Transaction rolls back and calls Drop on the buffer; OrderPlaced never reaches a listener. If the commit itself fails, the buffer is dropped for the same reason. Listeners on order.placed only ever observe orders that actually exist in the database.
Nested transactions and savepoints
A nested Transaction call on the same prepared ctx reuses the outermost buffer with savepoint semantics. The inner scope captures a baseline at the parent’s current event count, so an inner rollback truncates only events emitted past that checkpoint while the outer scope retains everything recorded before the nesting. Inner Flush is a no-op, so forwarding stays owned by the outermost commit and the entire transaction tree fires (or doesn’t) atomically.
Partial failure and retry
FlushFunc receives a BufferedEvent per recorded entry: Event() returns the user payload, Kind() returns the DispatchKind (KindDispatch, KindDispatchNow, KindDispatchAsync, KindDispatchAfter, KindUntil), and Delay() returns the requested delay for KindDispatchAfter (zero otherwise). If the flush callback returns an error for entry N, the failing entry plus every entry after it are swapped back into the buffer and the buffer is left in a non-flushed state. Calling Flush again resumes from the failed event; entries 0..N-1 are not redelivered. Re-entrant Flush invocations from inside a FlushFunc are silent no-ops so the outer drain retains control.
events.Buffer(ctx) returns a fresh standalone buffer when ctx carries no holder, and events recorded on a standalone buffer are silently discarded on flush (no underlying dispatcher is wired). Always thread the ctx returned by PrepareBuffer through to the call site that records events. A derived ctx without the holder will buffer into the void.DispatchAfterCommit
TransactionalDispatcher.DispatchAfterCommit(ctx, event) is the explicit
form for “fire this only if the current tx commits.”
- Inside a tx (between
BeginTransactionandCommit/Rollback) the event is queued and fires onCommit(ctx). - Outside a tx the event is dispatched immediately and any error from the underlying dispatcher is returned to the caller.
Both branches now return error. Previously the non-tx branch swallowed
dispatcher failures, which made the contract silently weaker outside a tx
than inside one. Wrap with _ = dispatcher.DispatchAfterCommit(ctx, evt)
if you genuinely want fire-and-forget semantics.
Event Definition
Using Structs
Define events as structs that implement the Event interface:
type UserRegistered struct {
UserID int
Email string
Timestamp time.Time
}
func (e UserRegistered) Name() string {
return "user.registered"
}
type OrderPlaced struct {
OrderID string
UserID int
Total float64
Items []OrderItem
}
func (e OrderPlaced) Name() string {
return "order.placed"
}Using Strings
For simple events, you can use strings directly. Resolve the dispatcher from the
router.Context (or the closure passed into App.Events) and call Dispatch on it.
// Dispatch string events from a handler
ctx.Events().Dispatch(ctx.Request.Context(), "cache.cleared")
ctx.Events().Dispatch(ctx.Request.Context(), "maintenance.started")
// Listen to string events at boot
func (a *App) Events(d events.Dispatcher) {
d.Listen("cache.cleared", &CacheListener{})
}Auto-Generated Names
If you don’t implement the Name() method, Velocity will generate a name from the struct type:
type UserRegistered struct {
UserID int
Email string
}
// Automatically generates name: "user.registered"
// (CamelCase converted to dot.notation)Listeners
Basic Listener
type MyListener struct{}
func (l *MyListener) Handle(ctx context.Context, event interface{}) error {
// Type assert to get event data
e, ok := event.(UserRegistered)
if !ok {
return fmt.Errorf("unexpected event type")
}
// Process the event; honour ctx for cancellation when applicable.
log.InfoContext(ctx, "Processing event", "user_id", e.UserID)
return nil
}
func (l *MyListener) ShouldQueue() bool {
return false // Synchronous processing
}Queued Listener
For long-running tasks, use queued listeners:
type SendWelcomeEmail struct {
events.QueuedBaseListener
}
func (l *SendWelcomeEmail) Handle(ctx context.Context, event interface{}) error {
e := event.(UserRegistered)
// Send email (time-consuming operation)
return emailService.Send(ctx, e.Email, "Welcome!")
}
func (l *SendWelcomeEmail) ShouldQueue() bool {
return true
}
func (l *SendWelcomeEmail) OnQueue() string {
return "emails" // Queue name
}
func (l *SendWelcomeEmail) Tries() int {
return 3 // Retry attempts
}
func (l *SendWelcomeEmail) WithDelay() time.Duration {
return 5 * time.Second // Processing delay
}Conditional Listener
Execute listener logic only when conditions are met:
type NotifyPremiumUsers struct{}
func (l *NotifyPremiumUsers) Handle(ctx context.Context, event interface{}) error {
e := event.(FeatureReleased)
return notificationService.NotifyPremium(ctx, e.FeatureName)
}
func (l *NotifyPremiumUsers) ShouldQueue() bool {
return true
}
func (l *NotifyPremiumUsers) ShouldHandle(event interface{}) bool {
e, ok := event.(FeatureReleased)
if !ok {
return false
}
// Only handle premium features
return e.IsPremium
}Event Subscribers
Subscribers let you group multiple event listeners in a single class:
type UserEventSubscriber struct{}
func (s *UserEventSubscriber) Subscribe(dispatcher events.Dispatcher) {
dispatcher.Listen("user.registered", &SendWelcomeEmail{})
dispatcher.Listen("user.registered", &UpdateStatistics{})
dispatcher.Listen("user.updated", &SyncUserData{})
dispatcher.Listen("user.deleted", &CleanupUserData{})
}
// Register the subscriber via App.Events
func (a *App) Events(d events.Dispatcher) {
d.Subscribe(&UserEventSubscriber{})
}Auto Subscriber
Automatically register methods as listeners based on naming convention.
Both the legacy (event) shape and the preferred (ctx, event) shape are
accepted; new code should write the ctx-aware form.
type UserSubscriber struct{}
// HandleUserRegistered -> listens to "user.registered"
func (s *UserSubscriber) HandleUserRegistered(ctx context.Context, event interface{}) error {
e := event.(UserRegistered)
log.InfoContext(ctx, "User registered", "user_id", e.UserID)
return nil
}
// HandleUserUpdated -> listens to "user.updated"
func (s *UserSubscriber) HandleUserUpdated(ctx context.Context, event interface{}) error {
e := event.(UserUpdated)
log.InfoContext(ctx, "User updated", "user_id", e.UserID)
return nil
}
// Register auto subscriber
func (a *App) Events(d events.Dispatcher) {
d.Subscribe(events.NewAutoSubscriber(&UserSubscriber{}, "Handle"))
}Mapped Subscriber
Explicitly map methods to events:
type OrderSubscriber struct{}
func (s *OrderSubscriber) ProcessOrder(ctx context.Context, event interface{}) error {
// Handle order.placed event
return nil
}
func (s *OrderSubscriber) CancelOrder(ctx context.Context, event interface{}) error {
// Handle order.cancelled event
return nil
}
// Register with explicit mapping
func (a *App) Events(d events.Dispatcher) {
d.Subscribe(events.NewMappedSubscriber(&OrderSubscriber{}, events.EventMap{
"ProcessOrder": "order.placed",
"CancelOrder": "order.cancelled",
}))
}Listening to Model Lifecycle Events
Dispatcher is not just for app-level domain events. The framework ships a parallel ModelObserver contract for per-record hooks (Creating, Created, Updating, Updated, Saving, Saved, Deleting, Deleted, Restoring, Restored) plus an ObservableDispatcher that owns an ObserverRegistry keyed by model type name. Every callback receives the caller-supplied context.Context so observers see request-scoped values (transactions, trace IDs, deadlines) without the model carrying them.
import (
"context"
"github.com/velocitykode/velocity/events"
)
type User struct {
ID int
Email string
}
// Implement only the hooks you need. BaseObserver no-ops the rest.
type UserObserver struct {
events.BaseObserver
}
func (o *UserObserver) Created(ctx context.Context, model interface{}) error {
u := model.(*User)
log.InfoContext(ctx, "user created", "user_id", u.ID)
return nil
}
func (o *UserObserver) Deleted(ctx context.Context, model interface{}) error {
u := model.(*User)
return cache.Forget(ctx, "user:"+strconv.Itoa(u.ID))
}Wire it up against an ObservableDispatcher. The registry keys by the unqualified type name ("User"), so register either by name or by passing an instance:
func (a *App) Events(d events.Dispatcher) {
obs, ok := d.(*events.ObservableDispatcher)
if !ok {
return // dispatcher does not support model observers
}
// By type name
obs.Observe("User", &UserObserver{})
// Or by instance (type name extracted via reflection)
obs.ObserveModel(&User{}, &UserObserver{})
}Lifecycle hooks fire only when the calling code invokes
obs.FireModelEvent(ctx, "created", &user) (typically from a service-layer
wrapper around your repository writes). FireModelEvent runs the observers
and also dispatches a regular *ModelEvent under the name
<modeltype>.<action> (e.g. user.created) using the same ctx, so a
wildcard listener on *.created still sees it. Use
events.NewConditionalObserver(observer, predicate) – where the
predicate is func(ctx context.Context, event string, model interface{}) bool
– to gate hooks on runtime state, or events.NewAutoObserver(instance) to
map struct methods named Creating / Created / … onto the contract via
reflection. AutoObserver accepts both the legacy (model) shape and the
preferred (ctx, model) shape.
ObservableModel implementations expose
FireEvent(ctx context.Context, event string) error so the model itself
can be the trigger when that fits the design better than a service-layer
call. ObserverRegistry.Fire(ctx, event, model) is the lower-level entry
point.
Dispatching Events
All dispatch calls are methods on the events.Dispatcher resolved from
ctx.Events() (in handlers) or the d parameter of App.Events. Every
method takes context.Context as its first argument.
Synchronous Dispatch
// Dispatch and wait for all listeners to complete. Listeners that
// return ShouldQueue() == true are still pushed to the queue dispatcher.
err := ctx.Events().Dispatch(ctx.Request.Context(), UserRegistered{
UserID: 123,
Email: "user@example.com",
})
if err != nil {
log.ErrorContext(ctx.Request.Context(), "Event dispatch failed", "error", err)
}Force Synchronous
// Always run synchronously, ignoring ShouldQueue.
err := ctx.Events().DispatchNow(ctx.Request.Context(), OrderPlaced{
OrderID: "ORD-123",
})Asynchronous Dispatch
// Dispatch without waiting. Uses the configured QueueDispatcher when
// available, otherwise falls back to a panic-safe goroutine (async.Go).
// Listeners receive a ctx derived via context.WithoutCancel: values
// propagate, cancellation does not.
err := ctx.Events().DispatchAsync(ctx.Request.Context(), EmailSent{
To: "user@example.com",
Subject: "Welcome",
})Delayed Dispatch
// Dispatch after a delay. Uses the queue when available; otherwise
// schedules via time.AfterFunc with a context.WithoutCancel-derived ctx.
err := ctx.Events().DispatchAfter(
ctx.Request.Context(),
OrderFollowUp{OrderID: "ORD-123"},
24*time.Hour,
)Dispatch Until Result
// Dispatch until first non-nil result. Listeners that want to short-circuit
// must implement
// HandleWithResult(ctx context.Context, event interface{}) (interface{}, error).
result, err := ctx.Events().Until(ctx.Request.Context(), ValidatePayment{
Amount: 99.99,
Method: "credit_card",
})
if result != nil {
paymentResult := result.(*PaymentResult)
// Use result
}Wildcard Patterns
Velocity supports flexible wildcard patterns for event matching:
Prefix Matching
func (a *App) Events(d events.Dispatcher) {
// Listen to all user events
d.Listen("user.*", &UserActivityLogger{})
}
// Matches:
// - user.registered
// - user.updated
// - user.deleted
// - user.anythingSuffix Matching
func (a *App) Events(d events.Dispatcher) {
// Listen to all "created" events
d.Listen("*.created", &CreatedLogger{})
}
// Matches:
// - user.created
// - order.created
// - product.createdMatch Everything
func (a *App) Events(d events.Dispatcher) {
// Listen to every event the dispatcher routes
d.Listen("*", &GlobalLogger{})
}Multiple Patterns
func (a *App) Events(d events.Dispatcher) {
// Listen to multiple event names with one registration
d.Listen([]string{
"user.registered",
"user.updated",
"order.placed",
}, &MultiEventListener{})
}Dispatcher API Cheat Sheet
Every operation is a method on the events.Dispatcher returned by
ctx.Events() (or passed into App.Events). Listen returns an int ID
that can be passed to Off to unregister later.
func (a *App) Events(d events.Dispatcher) {
// Register a listener (returns an int ID for later removal)
id := d.Listen("user.registered", &MyListener{})
// Register a subscriber (or auto/mapped/group)
d.Subscribe(&MySubscriber{})
// Remove a single listener by ID
d.Off(id)
// Remove all listeners for an event (Flush == Forget)
d.Flush("user.registered")
d.Forget("user.registered")
// Inspect listeners
if d.HasListeners("user.registered") {
listeners := d.GetListeners("user.registered")
_ = listeners
}
}
// Dispatch from anywhere that has access to the dispatcher and a ctx.
func (h *Handler) Do(ctx *router.Context) error {
d := ctx.Events()
rctx := ctx.Request.Context()
_ = d.Dispatch(rctx, UserRegistered{UserID: 1})
_ = d.DispatchNow(rctx, OrderPlaced{OrderID: "123"})
_ = d.DispatchAsync(rctx, EmailSent{})
_ = d.DispatchAfter(rctx, Reminder{}, 1*time.Hour)
result, _ := d.Until(rctx, ValidateData{})
_ = result
return nil
}Common Use Cases
User Registration Flow
// Event
type UserRegistered struct {
UserID int
Email string
Name string
IPAddress string
}
func (e UserRegistered) Name() string {
return "user.registered"
}
// Listeners
type SendWelcomeEmail struct{}
func (l *SendWelcomeEmail) Handle(ctx context.Context, event interface{}) error {
e := event.(UserRegistered)
return emailService.SendWelcome(ctx, e.Email, e.Name)
}
func (l *SendWelcomeEmail) ShouldQueue() bool { return true }
type CreateUserProfile struct{}
func (l *CreateUserProfile) Handle(ctx context.Context, event interface{}) error {
e := event.(UserRegistered)
return profileService.Create(ctx, e.UserID)
}
func (l *CreateUserProfile) ShouldQueue() bool { return false }
type TrackRegistration struct{}
func (l *TrackRegistration) Handle(ctx context.Context, event interface{}) error {
e := event.(UserRegistered)
return analytics.Track(ctx, "user_registered", map[string]interface{}{
"user_id": e.UserID,
"ip": e.IPAddress,
})
}
func (l *TrackRegistration) ShouldQueue() bool { return true }
// Setup
func (a *App) Events(d events.Dispatcher) {
d.Listen("user.registered", &SendWelcomeEmail{})
d.Listen("user.registered", &CreateUserProfile{})
d.Listen("user.registered", &TrackRegistration{})
}
// Usage in handler
func (c *AuthHandler) Register(ctx *router.Context) error {
user := createUser(email, password)
if err := ctx.Events().Dispatch(ctx.Request.Context(), UserRegistered{
UserID: user.ID,
Email: user.Email,
Name: user.Name,
IPAddress: ctx.Request.RemoteAddr,
}); err != nil {
return err
}
return ctx.JSON(user)
}Order Processing Pipeline
type OrderPlaced struct {
OrderID string
UserID int
Total float64
Items []OrderItem
CreatedAt time.Time
}
func (e OrderPlaced) Name() string {
return "order.placed"
}
// Listeners for different responsibilities
func (a *App) Events(d events.Dispatcher) {
d.Listen("order.placed", &ProcessPayment{})
d.Listen("order.placed", &SendOrderConfirmation{})
d.Listen("order.placed", &UpdateInventory{})
d.Listen("order.placed", &NotifyWarehouse{})
d.Listen("order.placed", &UpdateAnalytics{})
}Audit Logging
type AuditLogger struct{}
func (l *AuditLogger) Handle(ctx context.Context, event interface{}) error {
// Log all model changes
return auditLog.Record(ctx, event)
}
func (l *AuditLogger) ShouldQueue() bool {
return true
}
// Register for all creation events
func (a *App) Events(d events.Dispatcher) {
d.Listen("*.created", &AuditLogger{})
d.Listen("*.updated", &AuditLogger{})
d.Listen("*.deleted", &AuditLogger{})
}Cache Invalidation
type CacheInvalidator struct{}
func (l *CacheInvalidator) Handle(ctx context.Context, event interface{}) error {
switch e := event.(type) {
case UserUpdated:
cache.Forget(ctx, "user:"+strconv.Itoa(e.UserID))
case ProductUpdated:
cache.Forget(ctx, "product:"+e.ProductID)
}
return nil
}
func (l *CacheInvalidator) ShouldQueue() bool {
return false // Invalidate immediately
}
func (a *App) Events(d events.Dispatcher) {
d.Listen("*.updated", &CacheInvalidator{})
}Middleware
MiddlewareDispatcher runs events through a pipeline of EventMiddleware
before they reach listeners. The ctx threads through every stage so
deadlines, cancellation, and trace context survive the chain.
type EventMiddleware interface {
Handle(ctx context.Context, event interface{},
next func(context.Context, interface{}) error) error
}
// MiddlewareFunc adapts a plain func to the interface.
type MiddlewareFunc func(ctx context.Context, event interface{},
next func(context.Context, interface{}) error) errorBuilt-in middleware (LoggingMiddleware, TimingMiddleware,
RetryMiddleware, FilterMiddleware, ValidationMiddleware,
TransformMiddleware) all use the ctx-aware shape. RetryMiddleware
honours ctx.Done() between attempts so a cancelled caller stops the
retry loop instead of sleeping out the configured delay.
Testing
Using Fake Dispatcher
events.NewFakeDispatcher() records dispatched events without invoking
listeners. Wire it into a test app via velocity.WithFakeEvents(fake) so
every ctx.Events() call inside the handler resolves to the fake.
import (
"context"
"github.com/velocitykode/velocity"
"github.com/velocitykode/velocity/events"
"github.com/velocitykode/velocity/velocitytest"
)
func TestUserRegistration(t *testing.T) {
// Create fake dispatcher and inject it into the test app
fake := events.NewFakeDispatcher()
app, _ := velocitytest.NewApp(velocity.WithFakeEvents(fake))
// Perform action that dispatches events (using the test app)
registerUser(app, "test@example.com", "password")
// Assert event was dispatched
if err := fake.AssertDispatched(UserRegistered{}, func(e interface{}) bool {
event := e.(UserRegistered)
return event.Email == "test@example.com"
}); err != nil {
t.Fatal(err)
}
// Assert specific number of times
if err := fake.AssertDispatchedTimes(UserRegistered{}, 1); err != nil {
t.Fatal(err)
}
// Assert not dispatched
if err := fake.AssertNotDispatched(UserDeleted{}); err != nil {
t.Fatal(err)
}
}Testing Listeners
func TestSendWelcomeEmail(t *testing.T) {
listener := &SendWelcomeEmail{}
event := UserRegistered{
UserID: 1,
Email: "test@example.com",
Name: "Test User",
}
err := listener.Handle(context.Background(), event)
assert.NoError(t, err)
// Verify email was sent
assert.True(t, emailService.WasSent("test@example.com"))
}Integration Tests
func TestEventFlow(t *testing.T) {
// Spin up a real dispatcher and register tracking listeners on it directly.
dispatcher := events.NewDispatcher()
var (
called []string
mu sync.Mutex
)
trackingListener := func(name string) events.Listener {
return &testListener{
handle: func(ctx context.Context, e interface{}) error {
mu.Lock()
called = append(called, name)
mu.Unlock()
return nil
},
}
}
dispatcher.Listen("user.registered", trackingListener("email"))
dispatcher.Listen("user.registered", trackingListener("profile"))
dispatcher.Listen("user.registered", trackingListener("analytics"))
// Dispatch event
if err := dispatcher.Dispatch(context.Background(), UserRegistered{UserID: 1}); err != nil {
t.Fatal(err)
}
// Verify all listeners were called
assert.Equal(t, 3, len(called))
assert.Contains(t, called, "email")
assert.Contains(t, called, "profile")
assert.Contains(t, called, "analytics")
}Best Practices
- Use Meaningful Event Names: Follow dot notation (e.g.,
user.registered,order.placed) - Include All Necessary Data: Events should contain all data listeners need
- Keep Listeners Focused: Each listener should have a single responsibility
- Use Queues for Heavy Tasks: Queue emails, notifications, and API calls
- Make Listeners Idempotent: Listeners should handle duplicate events gracefully
- Handle Errors Gracefully: Don’t let one listener failure affect others
- Document Your Events: Clearly document what events exist and when they’re fired
- Test Event Flows: Use the fake dispatcher to test event dispatching
- Pass the right ctx: Use the request ctx (or tx ctx) on dispatch so listeners observe deadlines and trace IDs; remember async paths strip cancellation.
Performance Considerations
Async vs Sync
d := ctx.Events()
rctx := ctx.Request.Context()
// Synchronous: blocks until every listener completes
d.DispatchNow(rctx, event)
// Asynchronous: returns immediately (queue or async.Go fallback)
d.DispatchAsync(rctx, event)Use synchronous when:
- Event handling must complete before continuing
- Order of execution matters
- Error handling is critical
- You need cancellation/deadline to reach listeners
Use asynchronous when:
- Event handling can happen in background
- Performance is critical
- Failures can be retried
- It is OK that ctx cancellation does NOT propagate to listeners
Queue Integration
For high-volume applications, back queued listeners with a real queue. Build a
*events.DefaultDispatcher, wire a QueueDispatcher into it, then expose it
to the app (typically by registering a service provider that overrides
Services.Events, or by constructing the dispatcher in your main and
swapping it in via your own option).
import (
"context"
"github.com/velocitykode/velocity/events"
"github.com/velocitykode/velocity/queue"
)
// QueueDispatcher.Push now takes ctx as its first argument.
type QueueEventDispatcher struct {
queue queue.Driver
}
func (q *QueueEventDispatcher) Push(ctx context.Context, event interface{},
listener events.Listener, delay time.Duration) error {
// ... push to your queue, propagating ctx through to PushCtx/PushDelayedCtx
return nil
}
dispatcher := events.NewDispatcher()
dispatcher.SetQueueDispatcher(&QueueEventDispatcher{
queue: queue.Connection("redis"),
})
// `dispatcher` now pushes ShouldQueue() listeners through Redis instead of
// running them inline. Inject it as the app's events service at boot.Batching Events
// Collect events and dispatch in batches
type EventBatcher struct {
dispatcher events.Dispatcher
pending []interface{}
mu sync.Mutex
}
func (b *EventBatcher) Add(event interface{}) {
b.mu.Lock()
b.pending = append(b.pending, event)
b.mu.Unlock()
}
func (b *EventBatcher) Flush(ctx context.Context) error {
b.mu.Lock()
defer b.mu.Unlock()
for _, event := range b.pending {
if err := b.dispatcher.Dispatch(ctx, event); err != nil {
return err
}
}
b.pending = nil
return nil
}The framework also ships purpose-built wrappers for this space:
events.NewBatchingDispatcher, events.NewDebouncingDispatcher,
events.NewThrottlingDispatcher, and events.NewCoalescingDispatcher
implement the same patterns without hand-rolled bookkeeping. Each Dispatch
captures the caller’s ctx via context.WithoutCancel because the actual
fan-out happens on a background goroutine after the batching/debounce/coalesce
window elapses.
Troubleshooting
Events Not Firing
Check:
- Listeners are registered before events are dispatched
- Event names match exactly (case-sensitive)
- Global dispatcher is initialized
- No errors in listener registration
Listeners Not Called
Check:
- Wildcard patterns are correct
ShouldHandle()method isn’t preventing execution- Event is being dispatched on the correct dispatcher instance
- Listener implements the
Listenerinterface correctly (note:Handle(ctx, event)– legacyHandle(event)no longer satisfies the interface)
Performance Issues
Solutions:
- Use
DispatchAsync()for non-critical events - Implement queue-based event handling
- Batch events when possible
- Profile listener execution times
- Remove unnecessary listeners
Listener Cancelled Mid-Dispatch (sync only)
Dispatch / DispatchNow / Until propagate the caller’s cancellation.
A listener observing ctx.Done() will return early when the request ctx
is cancelled. If you don’t want that, dispatch on a derived ctx:
detached := context.WithoutCancel(ctx.Request.Context())
ctx.Events().Dispatch(detached, evt)Recipe: Audit every domain event with one listener
When: You want a single auditor to capture every event the app dispatches (*.created, *.updated, payment events, login events, anything) without enumerating them or coupling auditing to each producer.
Code:
type AuditAll struct{ writer audit.Writer }
func (l *AuditAll) Handle(ctx context.Context, event interface{}) error {
name, _ := event.(events.Event)
return l.writer.Record(ctx, audit.Entry{
Name: name.Name(),
Payload: event,
At: time.Now(),
})
}
func (l *AuditAll) ShouldQueue() bool { return true }
func (a *App) Events(d events.Dispatcher) {
d.Listen("*", &AuditAll{writer: a.Audit})
}Why this shape: The * pattern in Dispatcher.Listen matches every event the dispatcher routes, including model lifecycle events emitted via FireModelEvent and framework events like query.executed or request.failed. One listener instance is cheaper than per-event registrations and impossible to forget when a new event type is introduced. ShouldQueue() == true keeps audit writes off the request path so a slow audit sink cannot stall handlers; the dispatcher transparently pushes the work through whatever QueueDispatcher is wired (memory, Redis, etc.). Make the writer idempotent on (name, payload-hash) so retries do not double-record.
See also:
- Wildcard Patterns for prefix/suffix variants when you want to scope the auditor
- Listening to Model Lifecycle Events for the per-record hook contract that
*.createdrides on - Notifications for fan-out patterns that pair well with auditing
Related
- Notifications for turning an event into a multi-channel user notification
- Cache for invalidating cache entries from
*.updated/*.deletedlisteners - Queue for backing queued listeners with a durable driver and picking the right primitive for long-running work