Technical Patterns¶
This document details the advanced Go patterns and techniques used throughout Olla's codebase. Understanding these patterns is crucial for maintaining consistency and performance.
Concurrency Patterns¶
Lock-Free Data Structures with xsync¶
Olla heavily leverages github.com/puzpuzpuz/xsync/v3
for lock-free concurrent data structures:
// Thread-safe map without locks
type EndpointRegistry struct {
endpoints *xsync.MapOf[string, *Endpoint]
}
// Concurrent access without explicit synchronisation
func (r *EndpointRegistry) UpdateEndpoint(url string, endpoint *Endpoint) {
r.endpoints.Store(url, endpoint)
}
func (r *EndpointRegistry) GetEndpoint(url string) (*Endpoint, bool) {
return r.endpoints.Load(url)
}
Why xsync over sync.Map:
- Type-safe with generics
- Better performance for read-heavy workloads
- More predictable memory usage
- Range operations don't block writers
Atomic Operations for Statistics¶
All statistics collection uses atomic operations to avoid lock contention:
type ModelStats struct {
requestCount atomic.Int64
totalDuration atomic.Int64
errorCount atomic.Int64
bytesProcessed atomic.Int64
}
func (s *ModelStats) RecordRequest(duration time.Duration, bytes int64) {
s.requestCount.Add(1)
s.totalDuration.Add(int64(duration))
s.bytesProcessed.Add(bytes)
}
// Lock-free read
func (s *ModelStats) GetAverageLatency() time.Duration {
count := s.requestCount.Load()
if count == 0 {
return 0
}
total := s.totalDuration.Load()
return time.Duration(total / count)
}
Circuit Breaker State Machine¶
The circuit breaker uses atomic operations for lock-free state transitions:
const (
circuitClosed int64 = 0
circuitOpen int64 = 1
circuitHalfOpen int64 = 2
)
type CircuitBreaker struct {
state atomic.Int64
failures atomic.Int64
lastFailureTime atomic.Int64
threshold int64
timeout time.Duration
}
func (cb *CircuitBreaker) RecordFailure() {
failures := cb.failures.Add(1)
cb.lastFailureTime.Store(time.Now().UnixNano())
if failures >= cb.threshold {
// Atomic state transition
cb.state.CompareAndSwap(circuitClosed, circuitOpen)
}
}
func (cb *CircuitBreaker) CanPass() bool {
state := cb.state.Load()
switch state {
case circuitClosed:
return true
case circuitOpen:
// Check if timeout expired
lastFailure := time.Unix(0, cb.lastFailureTime.Load())
if time.Since(lastFailure) > cb.timeout {
// Try to transition to half-open
if cb.state.CompareAndSwap(circuitOpen, circuitHalfOpen) {
cb.failures.Store(0)
}
return true
}
return false
case circuitHalfOpen:
return true
}
return false
}
Worker Pool Pattern¶
Generic worker pool for controlled concurrency:
type WorkerPool[T any] struct {
workers int
taskQueue chan T
processor func(T)
wg sync.WaitGroup
stop chan struct{}
}
func NewWorkerPool[T any](workers int, processor func(T)) *WorkerPool[T] {
wp := &WorkerPool[T]{
workers: workers,
taskQueue: make(chan T, workers*10), // Buffered queue
processor: processor,
stop: make(chan struct{}),
}
wp.start()
return wp
}
func (wp *WorkerPool[T]) start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker()
}
}
func (wp *WorkerPool[T]) worker() {
defer wp.wg.Done()
for {
select {
case task := <-wp.taskQueue:
wp.processor(task)
case <-wp.stop:
return
}
}
}
func (wp *WorkerPool[T]) Submit(task T) {
select {
case wp.taskQueue <- task:
// Task queued
default:
// Queue full, handle backpressure
}
}
Memory Optimisation Patterns¶
Generic Object Pool¶
Type-safe object pooling with generics:
type Pool[T any] struct {
pool sync.Pool
new func() T
reset func(*T)
}
func NewPool[T any](newFn func() T, resetFn func(*T)) *Pool[T] {
return &Pool[T]{
pool: sync.Pool{
New: func() interface{} {
return newFn()
},
},
new: newFn,
reset: resetFn,
}
}
func (p *Pool[T]) Get() T {
v := p.pool.Get()
if v == nil {
return p.new()
}
return v.(T)
}
func (p *Pool[T]) Put(v T) {
if p.reset != nil {
p.reset(&v)
}
p.pool.Put(v)
}
// Usage example
var bufferPool = NewPool(
func() []byte { return make([]byte, 0, 32*1024) },
func(b *[]byte) { *b = (*b)[:0] }, // Reset slice
)
Connection Pool Management¶
Per-endpoint connection pools with automatic cleanup:
type ConnectionPool struct {
transport *http.Transport
lastUsed atomic.Int64
activeConns atomic.Int64
}
type PoolManager struct {
pools *xsync.MapOf[string, *ConnectionPool]
cleanupTimer *time.Timer
}
func (pm *PoolManager) GetPool(endpoint string) *ConnectionPool {
pool, _ := pm.pools.LoadOrCompute(endpoint, func() *ConnectionPool {
return &ConnectionPool{
transport: &http.Transport{
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true, // Better for local networks
},
}
})
pool.lastUsed.Store(time.Now().UnixNano())
return pool
}
func (pm *PoolManager) cleanupStale() {
threshold := time.Now().Add(-5 * time.Minute).UnixNano()
pm.pools.Range(func(endpoint string, pool *ConnectionPool) bool {
if pool.lastUsed.Load() < threshold && pool.activeConns.Load() == 0 {
pool.transport.CloseIdleConnections()
pm.pools.Delete(endpoint)
}
return true
})
}
Buffer Reuse Pattern¶
Efficient buffer management for streaming:
type StreamProcessor struct {
bufferPool *Pool[*bytes.Buffer]
chunkPool *Pool[[]byte]
}
func (sp *StreamProcessor) ProcessStream(r io.Reader, w io.Writer) error {
// Get buffer from pool
chunk := sp.chunkPool.Get()
defer sp.chunkPool.Put(chunk)
buffer := sp.bufferPool.Get()
defer sp.bufferPool.Put(buffer)
// Stream processing
for {
n, err := r.Read(chunk)
if n > 0 {
buffer.Write(chunk[:n])
// Process when buffer reaches threshold
if buffer.Len() >= 8192 {
if _, err := w.Write(buffer.Bytes()); err != nil {
return err
}
buffer.Reset()
}
}
if err == io.EOF {
break
}
if err != nil {
return err
}
}
// Flush remaining
if buffer.Len() > 0 {
_, err := w.Write(buffer.Bytes())
return err
}
return nil
}
Service Lifecycle Patterns¶
Dependency Injection with Service Manager¶
Topological sorting for dependency resolution:
type ServiceManager struct {
services map[string]ManagedService
depGraph map[string][]string
startOrder []string
}
func (sm *ServiceManager) ResolveDependencies() error {
// Kahn's algorithm for topological sort
inDegree := make(map[string]int)
for name := range sm.services {
inDegree[name] = 0
}
for _, deps := range sm.depGraph {
for _, dep := range deps {
inDegree[dep]++
}
}
queue := []string{}
for name, degree := range inDegree {
if degree == 0 {
queue = append(queue, name)
}
}
var sorted []string
for len(queue) > 0 {
current := queue[0]
queue = queue[1:]
sorted = append(sorted, current)
for _, neighbor := range sm.depGraph[current] {
inDegree[neighbor]--
if inDegree[neighbor] == 0 {
queue = append(queue, neighbor)
}
}
}
if len(sorted) != len(sm.services) {
return errors.New("circular dependency detected")
}
sm.startOrder = sorted
return nil
}
Two-Phase Service Initialisation¶
Prevents circular dependencies:
// Phase 1: Create all services
func createServices(cfg *Config) map[string]interface{} {
services := make(map[string]interface{})
// Create with nil dependencies
services["stats"] = NewStatsService(nil)
services["security"] = NewSecurityService(nil)
services["proxy"] = NewProxyService(nil)
return services
}
// Phase 2: Wire dependencies
func wireServices(services map[string]interface{}) {
stats := services["stats"].(*StatsService)
security := services["security"].(*SecurityService)
proxy := services["proxy"].(*ProxyService)
// Now wire them together
security.SetStatsService(stats)
proxy.SetSecurityService(security)
proxy.SetStatsService(stats)
}
Graceful Shutdown Pattern¶
Coordinated shutdown with cleanup:
type Service struct {
shutdownCh chan struct{}
shutdownWg sync.WaitGroup
}
func (s *Service) Start(ctx context.Context) error {
// Start background workers
s.shutdownWg.Add(3)
go s.healthChecker(ctx)
go s.metricsCollector(ctx)
go s.connectionCleaner(ctx)
return nil
}
func (s *Service) Stop(ctx context.Context) error {
// Signal shutdown
close(s.shutdownCh)
// Wait with timeout
done := make(chan struct{})
go func() {
s.shutdownWg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (s *Service) healthChecker(ctx context.Context) {
defer s.shutdownWg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.performHealthCheck()
case <-s.shutdownCh:
return
case <-ctx.Done():
return
}
}
}
Event System Patterns¶
Generic Event Bus¶
Type-safe event publishing and subscription:
type Event[T any] struct {
Type string
Timestamp time.Time
Data T
}
type EventBus[T any] struct {
subscribers *xsync.MapOf[string, []chan Event[T]]
workerPool *WorkerPool[Event[T]]
}
func (eb *EventBus[T]) Subscribe(eventType string) <-chan Event[T] {
ch := make(chan Event[T], 100)
subs, _ := eb.subscribers.LoadOrStore(eventType, []chan Event[T]{})
subs = append(subs, ch)
eb.subscribers.Store(eventType, subs)
return ch
}
func (eb *EventBus[T]) Publish(eventType string, data T) {
event := Event[T]{
Type: eventType,
Timestamp: time.Now(),
Data: data,
}
if subs, ok := eb.subscribers.Load(eventType); ok {
for _, ch := range subs {
select {
case ch <- event:
// Sent
default:
// Channel full, drop event
}
}
}
}
Request Context Patterns¶
Request Metadata Propagation¶
Context-based request tracking:
type contextKey string
const (
requestIDKey contextKey = "request-id"
endpointKey contextKey = "endpoint"
modelKey contextKey = "model"
startTimeKey contextKey = "start-time"
)
func WithRequestMetadata(ctx context.Context, r *http.Request) context.Context {
// Generate request ID
requestID := generateRequestID()
ctx = context.WithValue(ctx, requestIDKey, requestID)
// Add start time
ctx = context.WithValue(ctx, startTimeKey, time.Now())
// Extract model from request
if model := extractModel(r); model != "" {
ctx = context.WithValue(ctx, modelKey, model)
}
return ctx
}
func GetRequestID(ctx context.Context) string {
if id, ok := ctx.Value(requestIDKey).(string); ok {
return id
}
return ""
}
func GetElapsedTime(ctx context.Context) time.Duration {
if start, ok := ctx.Value(startTimeKey).(time.Time); ok {
return time.Since(start)
}
return 0
}
Structured Logging with Context¶
Context-aware logging throughout request lifecycle:
type Logger struct {
base *slog.Logger
}
func (l *Logger) WithContext(ctx context.Context) *Logger {
attrs := []slog.Attr{}
if requestID := GetRequestID(ctx); requestID != "" {
attrs = append(attrs, slog.String("request_id", requestID))
}
if model := GetModel(ctx); model != "" {
attrs = append(attrs, slog.String("model", model))
}
if endpoint := GetEndpoint(ctx); endpoint != "" {
attrs = append(attrs, slog.String("endpoint", endpoint))
}
return &Logger{
base: l.base.With(attrs...),
}
}
Performance Patterns¶
Zero-Allocation String Building¶
Efficient string concatenation:
// String builder pool
var stringBuilderPool = sync.Pool{
New: func() interface{} {
return &strings.Builder{}
},
}
func BuildPath(segments ...string) string {
sb := stringBuilderPool.Get().(*strings.Builder)
defer func() {
sb.Reset()
stringBuilderPool.Put(sb)
}()
for i, segment := range segments {
if i > 0 {
sb.WriteByte('/')
}
sb.WriteString(segment)
}
return sb.String()
}
Lazy Initialisation¶
Compute-once pattern for expensive operations:
type LazyValue[T any] struct {
once sync.Once
value T
err error
init func() (T, error)
}
func NewLazy[T any](init func() (T, error)) *LazyValue[T] {
return &LazyValue[T]{init: init}
}
func (l *LazyValue[T]) Get() (T, error) {
l.once.Do(func() {
l.value, l.err = l.init()
})
return l.value, l.err
}
// Usage
var profileConfig = NewLazy(func() (*ProfileConfig, error) {
return loadProfileFromDisk("ollama.yaml")
})
Batch Processing¶
Aggregate operations for efficiency:
type BatchProcessor[T any] struct {
items []T
capacity int
mu sync.Mutex
process func([]T) error
ticker *time.Ticker
}
func (bp *BatchProcessor[T]) Add(item T) {
bp.mu.Lock()
bp.items = append(bp.items, item)
if len(bp.items) >= bp.capacity {
items := bp.items
bp.items = make([]T, 0, bp.capacity)
bp.mu.Unlock()
go bp.process(items)
} else {
bp.mu.Unlock()
}
}
func (bp *BatchProcessor[T]) flush() {
bp.mu.Lock()
if len(bp.items) > 0 {
items := bp.items
bp.items = make([]T, 0, bp.capacity)
bp.mu.Unlock()
bp.process(items)
} else {
bp.mu.Unlock()
}
}
Error Handling Patterns¶
Typed Errors with Context¶
Domain-specific error types:
type ErrorCode string
const (
ErrEndpointNotFound ErrorCode = "ENDPOINT_NOT_FOUND"
ErrModelUnavailable ErrorCode = "MODEL_UNAVAILABLE"
ErrRateLimited ErrorCode = "RATE_LIMITED"
)
type AppError struct {
Code ErrorCode
Message string
Details map[string]interface{}
Cause error
Timestamp time.Time
}
func (e *AppError) Error() string {
if e.Cause != nil {
return fmt.Sprintf("%s: %s (caused by: %v)", e.Code, e.Message, e.Cause)
}
return fmt.Sprintf("%s: %s", e.Code, e.Message)
}
func (e *AppError) Unwrap() error {
return e.Cause
}
func NewAppError(code ErrorCode, message string) *AppError {
return &AppError{
Code: code,
Message: message,
Details: make(map[string]interface{}),
Timestamp: time.Now(),
}
}
func (e *AppError) WithDetail(key string, value interface{}) *AppError {
e.Details[key] = value
return e
}
Error Recovery Pattern¶
Graceful degradation with fallbacks:
type Resilient struct {
primary func() (interface{}, error)
fallback func() (interface{}, error)
retries int
backoff time.Duration
}
func (r *Resilient) Execute() (interface{}, error) {
var lastErr error
// Try primary with retries
for i := 0; i < r.retries; i++ {
result, err := r.primary()
if err == nil {
return result, nil
}
lastErr = err
if i < r.retries-1 {
time.Sleep(r.backoff * time.Duration(i+1))
}
}
// Try fallback
if r.fallback != nil {
result, err := r.fallback()
if err == nil {
return result, nil
}
// Wrap both errors
return nil, fmt.Errorf("primary failed: %w, fallback failed: %v", lastErr, err)
}
return nil, lastErr
}
Best Practices Summary¶
Do's¶
- Use atomic operations for counters and flags
- Leverage xsync for concurrent maps and counters
- Pool objects that are frequently allocated
- Propagate context through all function calls
- Use structured logging with request context
- Implement circuit breakers for external calls
- Handle panics in goroutines
- Clean up resources with defer
Don'ts¶
- Don't use mutexes when atomics suffice
- Don't create goroutines without lifecycle management
- Don't ignore context cancellation
- Don't allocate in hot paths
- Don't use global variables for state
- Don't panic in library code
- Don't ignore errors even in deferred functions
Next Steps¶
- Architecture Details - System architecture
- Proxy Engines - Proxy implementations
- Testing Guide - Testing patterns
- Contributing - Contribution guidelines