# HG changeset patch # User Peter Sanchez # Date 1690414319 21600 # Wed Jul 26 17:31:59 2023 -0600 # Node ID 083abb4e1c7b6db987a8ab0093786aec360d05c5 # Parent fc53d174a4eb413ce97cb4725f54d63974115f43 Adding timeout option for queue termination so we don't end up waiting forever. diff --git a/server/server.go b/server/server.go --- a/server/server.go +++ b/server/server.go @@ -77,12 +77,13 @@ // Server is the global echo application server type Server struct { - Config *config.Config - DB *sql.DB - Session *scs.SessionManager - Email *email.ServiceQueue - Storage storage.Service - GQL *GraphQL + Config *config.Config + DB *sql.DB + Session *scs.SessionManager + Email *email.ServiceQueue + Storage storage.Service + GQL *GraphQL + QueueTimeout time.Duration e *echo.Echo ai *appInfo @@ -90,6 +91,7 @@ autocert bool csrfSkip []string queues []*work.Queue + queuecan map[string]context.CancelFunc } // Context is the context passed to handlers and middlewares @@ -582,9 +584,13 @@ // WithQueues add dowork task queues for this server to manage func (s *Server) WithQueues(queues ...*work.Queue) *Server { + if s.queuecan == nil { + s.queuecan = make(map[string]context.CancelFunc) + } s.queues = append(s.queues, queues...) for _, queue := range queues { - ctx := context.Background() + ctx, can := context.WithCancel(context.Background()) + s.queuecan[queue.Name()] = can queue.Start(ctx) } return s @@ -687,14 +693,27 @@ s.e.Shutdown(ctx) cancel() - s.e.Logger.Printf("Terminating work queues...\n") - work.Join(s.queues...) + s.e.Logger.Printf("Terminating work queues (timeout in %v)...", s.QueueTimeout) + go work.Join(s.queues...) + select { + case <-time.After(s.QueueTimeout): + // Call the context cancel function after QueueTimeout expires + s.e.Logger.Printf("QueueTimeout reached. Terminating queue processing immediately...") + for _, can := range s.queuecan { + can() + } + } s.e.Logger.Printf("Shutdown completed.\n") } // New creates a new server instance func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server { - server := &Server{Config: c, DB: db, e: e} + server := &Server{ + Config: c, + DB: db, + QueueTimeout: 30 * time.Second, + e: e, + } return server }