Back
Featured image of post 优雅终止 | 基于 go 的 http 库实现

优雅终止 | 基于 go 的 http 库实现

「优雅终止」指的是当服务需要下线或者重启时,通过一些措施和手段,一方面能够让其他服务尽快的感知到当前服务的下线,另一方面也尽量减小对当前正在处理请求的影响。优雅终止可提升服务的高可用,减少下线造成的服务抖动,提升服务稳定性和用户体验。

源码地址: https://github.com/Ai-feier/gracefulshutdown

思考优雅终止目标

  1. 应用开始关闭时, 对于服务器的连接
    1. 对已有连接: 等待其处理
    2. 拒绝新请求
  2. 是否需要将 cache 中数据保存到 db
  3. 释放服务器资源

对象关系图

应用启动:

image-20240102163407543

优雅终止:

image-20240102163454049

优雅的优雅终止实现

需监听的信号量

windows

var Signals = []os.Signal{
	os.Interrupt, os.Kill, syscall.SIGKILL,
	syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
	syscall.SIGABRT, syscall.SIGTERM,
}

linux

var Signals = []os.Signal{
	os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
	syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
	syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM,
}

Server

一个 server 对应一个 service

定义

// Server 对应一个服务
type Server struct {
	svc  *http.Server
	name string
	mux  *serverMux
}

// serverMux 请求锁
// 装饰器模式
type serverMux struct {
	reject bool
	*http.ServeMux
}

其中使用 http.ServeMux 是为确保一个 server, 只能持有一个正在处理的 request

new

传入 server 名字, 监听地址

func NewServer(name, addr string) *Server {
	mux := &serverMux{ServeMux: http.NewServeMux()}
	return &Server{
		name: name,
		mux:  mux,
		svc: &http.Server{
			Handler: mux,
			Addr:    addr,
		},
	}
}

server 的启动与终止

func (s *Server) Start() error {
	return s.svc.ListenAndServe()
}

func (s *Server) Handle(pattern string, handler http.HandlerFunc) {
	s.mux.Handle(pattern, handler)
}

func (s *Server) reject() {
	s.mux.reject = true
}

func (s *Server) stop(ctx context.Context) error {
	log.Println("server: ", s.name, "关闭中")
	return s.svc.Shutdown(ctx)
}

App

定义

// 扩展 App (option 设计模式)
type AppOption func(app *App)

type App struct {
	servers []*Server

	// app 关闭的最长时间
	shutdownTimeout time.Duration

	// 留给 server 处理已有请求的时间
	waitTime time.Duration

	// 回调函数超时控制
	cbTime time.Duration

	cbs []ShutdownCallBack
}

new

// NewApp 新建 App 
// 需用户传入 server
func NewApp(servers []*Server, opts ...AppOption) *App {
	app := &App{
		servers:         servers,
		shutdownTimeout: time.Second * 30,
		waitTime:        time.Second * 10,
		cbTime:          3 * time.Second,
	}
	for _, opt := range opts {
		opt(app)
	}
	return app
}

// WithShutDownCallBack 向 App 传入 callback 函数
func WithShutDownCallBack(cbs ...ShutdownCallBack) AppOption {
	return func(app *App) {
		app.cbs = cbs
	}
}

app 启动与优雅终止

func (app *App) StartAndServer() {
	// 启动服务
	for _, s := range app.servers {
		svc := s
		go func() {
			if err := svc.Start(); err != nil {
				if errors.Is(err, http.ErrServerClosed) {
					log.Printf("服务器%s已关闭", svc.name)
				} else {
					log.Printf("服务器%s异常退出", svc.name)
				}
			}
		}()
	}
	
	log.Println("应用启动成功")

	// app 启动成功
	// 监听退出信号
	// 监听两次信号, 第一次优雅终止, 第二次强行终止
	ch := make(chan os.Signal, 2)
	signal.Notify(ch, sig.Signals...)
	<-ch
	fmt.Println("应用开始关闭...")
	go func() {
		select {
		case <-ch:
			log.Println("强制退出")
			os.Exit(1)
		case <-time.After(app.shutdownTimeout):
			log.Println("超时强行退出")
			os.Exit(1)
		}
	}()

	app.shutdown(context.Background())
}

func (app *App) shutdown(ctx context.Context) {
	log.Println("app start to shutdown")
	// 将 app 下所有 server 拒绝新请求
	for _, svc := range app.servers {
		svc.reject()
	}
	
	log.Println("等待已有请求处理")
	// 这里可以改造为实时统计正在处理的请求数量,为0 则下一步
	time.Sleep(time.Second * app.waitTime)
	
	log.Println("开始关闭应用")
	var wg sync.WaitGroup
	wg.Add(len(app.servers))
	for _, s := range app.servers {
		svc := s
		go func() {
			if err := svc.stop(ctx);err != nil {
				log.Println("服务器关闭失败", svc.name)
			}
			wg.Done()
		}()
	}
	wg.Wait()
	
	// 执行回调函数
	wg.Add(len(app.cbs))
	log.Println("开始执行回调函数")
	for _, cb := range app.cbs {
		c := cb
		go func() {
			ctx2, cancal := context.WithCancel(ctx)
			c(ctx2)
			cancal()
			wg.Done()
		}()
	}
	wg.Wait()
	
	// 释放资源
	log.Println("开始释放资源")
	app.close()
}

CallBackFunc

执行时间: 在优雅终止期间 app 确保所有 server 已关闭后调用

由用户自定义实现

示例

func StoreCacheToDBCallBack(ctx context.Context) {
	done := make(chan struct{}, 1)
	go func() {
		// 这里将 cache 中数据刷到 db
		log.Println("缓存刷新中...")
		time.Sleep(time.Second)
		done <- struct{}{}
	}()

	select {
	case <-done:
		log.Printf("缓存被刷新到了 DB")
	case <-ctx.Done():
		log.Printf("刷新缓存超时")
	}
}

完整代码

main.go

package main

import (
	"context"
	"github.com/Ai-feier/http/server"
	"log"
	"net/http"
	"time"
)

func main() {
	// 新建两个 server
	s1 := server.NewServer("service1", "localhost:8081")
	s1.Handle("/", func(writer http.ResponseWriter, request *http.Request) {
		_, _ = writer.Write([]byte("hello world"))
	})
	s2 := server.NewServer("service2", "localhost:8082")
	
	// 新建应用, 包含 service1, service2
	app := server.NewApp([]*server.Server{s1, s2}, server.WithShutDownCallBack(StoreCacheToDBCallBack))
	app.StartAndServer()
}

func StoreCacheToDBCallBack(ctx context.Context) {
	done := make(chan struct{}, 1)
	go func() {
		// 这里将 cache 中数据刷到 db
		log.Println("缓存刷新中...")
		time.Sleep(time.Second)
		done <- struct{}{}
	}()

	select {
	case <-done:
		log.Printf("缓存被刷新到了 DB")
	case <-ctx.Done():
		log.Printf("刷新缓存超时")
	}
}

server.go

package server

import (
	"context"
	"errors"
	"fmt"
	"github.com/Ai-feier/sig"
	"log"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"time"
)

type AppOption func(app *App)

// ShutdownCallBack App 关闭时的回调函数
// 默认超时 3s
// 用户可通过 context 自行控制
type ShutdownCallBack func(ctx context.Context)

// WithShutDownCallBack 向 App 传入 callback 函数
func WithShutDownCallBack(cbs ...ShutdownCallBack) AppOption {
	return func(app *App) {
		app.cbs = cbs
	}
}

type App struct {
	servers []*Server

	// app 关闭的最长时间
	shutdownTimeout time.Duration

	// 留给 server 处理已有请求的时间
	waitTime time.Duration

	// 回调函数超时控制
	cbTime time.Duration

	cbs []ShutdownCallBack
}

// NewApp 新建 App 
// 需用户传入 server
func NewApp(servers []*Server, opts ...AppOption) *App {
	app := &App{
		servers:         servers,
		shutdownTimeout: time.Second * 30,
		waitTime:        time.Second * 10,
		cbTime:          3 * time.Second,
	}
	for _, opt := range opts {
		opt(app)
	}
	return app
}

func (app *App) StartAndServer() {
	// 启动服务
	for _, s := range app.servers {
		svc := s
		go func() {
			if err := svc.Start(); err != nil {
				if errors.Is(err, http.ErrServerClosed) {
					log.Printf("服务器%s已关闭", svc.name)
				} else {
					log.Printf("服务器%s异常退出", svc.name)
				}
			}
		}()
	}
	
	log.Println("应用启动成功")

	// app 启动成功
	// 监听退出信号
	// 监听两次信号, 第一次优雅终止, 第二次强行终止
	ch := make(chan os.Signal, 2)
	signal.Notify(ch, sig.Signals...)
	<-ch
	fmt.Println("应用开始关闭...")
	go func() {
		select {
		case <-ch:
			log.Println("强制退出")
			os.Exit(1)
		case <-time.After(app.shutdownTimeout):
			log.Println("超时强行退出")
			os.Exit(1)
		}
	}()

	app.shutdown(context.Background())
}

func (app *App) shutdown(ctx context.Context) {
	log.Println("app start to shutdown")
	// 将 app 下所有 server 拒绝新请求
	for _, svc := range app.servers {
		svc.reject()
	}
	
	log.Println("等待已有请求处理")
	// 这里可以改造为实时统计正在处理的请求数量,为0 则下一步
	time.Sleep(time.Second * app.waitTime)
	
	log.Println("开始关闭应用")
	var wg sync.WaitGroup
	wg.Add(len(app.servers))
	for _, s := range app.servers {
		svc := s
		go func() {
			if err := svc.stop(ctx);err != nil {
				log.Println("服务器关闭失败", svc.name)
			}
			wg.Done()
		}()
	}
	wg.Wait()
	
	// 执行回调函数
	wg.Add(len(app.cbs))
	log.Println("开始执行回调函数")
	for _, cb := range app.cbs {
		c := cb
		go func() {
			ctx2, cancal := context.WithCancel(ctx)
			c(ctx2)
			cancal()
			wg.Done()
		}()
	}
	wg.Wait()
	
	// 释放资源
	log.Println("开始释放资源")
	app.close()
}

func (app *App) close() {
	// 可补充释放资源逻辑
	time.Sleep(time.Second)
	log.Println("应用关闭")
}

// Server 对应一个服务
type Server struct {
	svc  *http.Server
	name string
	mux  *serverMux
}

// serverMux 请求锁
//装饰器模式
type serverMux struct {
	reject bool
	*http.ServeMux
}

func (s *serverMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if s.reject {
		w.WriteHeader(http.StatusInternalServerError)
		_, _ = w.Write([]byte("服务器已关闭"))
		return
	}
	s.ServeMux.ServeHTTP(w, r)
}

func NewServer(name, addr string) *Server {
	mux := &serverMux{ServeMux: http.NewServeMux()}
	return &Server{
		name: name,
		mux:  mux,
		svc: &http.Server{
			Handler: mux,
			Addr:    addr,
		},
	}
}

func (s *Server) Start() error {
	return s.svc.ListenAndServe()
}

func (s *Server) Handle(pattern string, handler http.HandlerFunc) {
	s.mux.Handle(pattern, handler)
}

func (s *Server) reject() {
	s.mux.reject = true
}

func (s *Server) stop(ctx context.Context) error {
	log.Println("server: ", s.name, "关闭中")
	return s.svc.Shutdown(ctx)
}

效果演示

C:\Users\26645\Desktop\goproject\gracefulshutdown\http>go build .

C:\Users\26645\Desktop\goproject\gracefulshutdown\http>http.exe
2024/01/02 15:32:08 应用启动成功
应用开始关闭...
2024/01/02 15:32:10 app start to shutdown
2024/01/02 15:32:10 等待已有请求处理
2024/01/02 15:32:10 开始关闭应用
2024/01/02 15:32:10 server:  service2 关闭中
2024/01/02 15:32:10 server:  service1 关闭中
2024/01/02 15:32:10 服务器service2已关闭
2024/01/02 15:32:10 服务器service1已关闭
2024/01/02 15:32:10 开始执行回调函数
2024/01/02 15:32:10 缓存刷新中...
2024/01/02 15:32:11 缓存被刷新到了 DB
2024/01/02 15:32:11 开始释放资源
2024/01/02 15:32:12 应用关闭

19680890

Licensed under CC BY-NC-SA 4.0