思考优雅终止目标
- 应用开始关闭时, 对于服务器的连接
- 对已有连接: 等待其处理
- 拒绝新请求
- 是否需要将 cache 中数据保存到 db
- 释放服务器资源
对象关系图
应用启动:
优雅终止:
优雅的优雅终止实现
需监听的信号量
windows
var Signals = []os.Signal{
os.Interrupt, os.Kill, syscall.SIGKILL,
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
syscall.SIGABRT, syscall.SIGTERM,
}
linux
var Signals = []os.Signal{
os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM,
}
Server
一个 server 对应一个 service
定义
// Server 对应一个服务
type Server struct {
svc *http.Server
name string
mux *serverMux
}
// serverMux 请求锁
// 装饰器模式
type serverMux struct {
reject bool
*http.ServeMux
}
其中使用 http.ServeMux 是为确保一个 server, 只能持有一个正在处理的 request
new
传入 server 名字, 监听地址
func NewServer(name, addr string) *Server {
mux := &serverMux{ServeMux: http.NewServeMux()}
return &Server{
name: name,
mux: mux,
svc: &http.Server{
Handler: mux,
Addr: addr,
},
}
}
server 的启动与终止
func (s *Server) Start() error {
return s.svc.ListenAndServe()
}
func (s *Server) Handle(pattern string, handler http.HandlerFunc) {
s.mux.Handle(pattern, handler)
}
func (s *Server) reject() {
s.mux.reject = true
}
func (s *Server) stop(ctx context.Context) error {
log.Println("server: ", s.name, "关闭中")
return s.svc.Shutdown(ctx)
}
App
定义
// 扩展 App (option 设计模式)
type AppOption func(app *App)
type App struct {
servers []*Server
// app 关闭的最长时间
shutdownTimeout time.Duration
// 留给 server 处理已有请求的时间
waitTime time.Duration
// 回调函数超时控制
cbTime time.Duration
cbs []ShutdownCallBack
}
new
// NewApp 新建 App
// 需用户传入 server
func NewApp(servers []*Server, opts ...AppOption) *App {
app := &App{
servers: servers,
shutdownTimeout: time.Second * 30,
waitTime: time.Second * 10,
cbTime: 3 * time.Second,
}
for _, opt := range opts {
opt(app)
}
return app
}
// WithShutDownCallBack 向 App 传入 callback 函数
func WithShutDownCallBack(cbs ...ShutdownCallBack) AppOption {
return func(app *App) {
app.cbs = cbs
}
}
app 启动与优雅终止
func (app *App) StartAndServer() {
// 启动服务
for _, s := range app.servers {
svc := s
go func() {
if err := svc.Start(); err != nil {
if errors.Is(err, http.ErrServerClosed) {
log.Printf("服务器%s已关闭", svc.name)
} else {
log.Printf("服务器%s异常退出", svc.name)
}
}
}()
}
log.Println("应用启动成功")
// app 启动成功
// 监听退出信号
// 监听两次信号, 第一次优雅终止, 第二次强行终止
ch := make(chan os.Signal, 2)
signal.Notify(ch, sig.Signals...)
<-ch
fmt.Println("应用开始关闭...")
go func() {
select {
case <-ch:
log.Println("强制退出")
os.Exit(1)
case <-time.After(app.shutdownTimeout):
log.Println("超时强行退出")
os.Exit(1)
}
}()
app.shutdown(context.Background())
}
func (app *App) shutdown(ctx context.Context) {
log.Println("app start to shutdown")
// 将 app 下所有 server 拒绝新请求
for _, svc := range app.servers {
svc.reject()
}
log.Println("等待已有请求处理")
// 这里可以改造为实时统计正在处理的请求数量,为0 则下一步
time.Sleep(time.Second * app.waitTime)
log.Println("开始关闭应用")
var wg sync.WaitGroup
wg.Add(len(app.servers))
for _, s := range app.servers {
svc := s
go func() {
if err := svc.stop(ctx);err != nil {
log.Println("服务器关闭失败", svc.name)
}
wg.Done()
}()
}
wg.Wait()
// 执行回调函数
wg.Add(len(app.cbs))
log.Println("开始执行回调函数")
for _, cb := range app.cbs {
c := cb
go func() {
ctx2, cancal := context.WithCancel(ctx)
c(ctx2)
cancal()
wg.Done()
}()
}
wg.Wait()
// 释放资源
log.Println("开始释放资源")
app.close()
}
CallBackFunc
执行时间: 在优雅终止期间 app 确保所有 server 已关闭后调用
由用户自定义实现
示例
func StoreCacheToDBCallBack(ctx context.Context) {
done := make(chan struct{}, 1)
go func() {
// 这里将 cache 中数据刷到 db
log.Println("缓存刷新中...")
time.Sleep(time.Second)
done <- struct{}{}
}()
select {
case <-done:
log.Printf("缓存被刷新到了 DB")
case <-ctx.Done():
log.Printf("刷新缓存超时")
}
}
完整代码
main.go
package main
import (
"context"
"github.com/Ai-feier/http/server"
"log"
"net/http"
"time"
)
func main() {
// 新建两个 server
s1 := server.NewServer("service1", "localhost:8081")
s1.Handle("/", func(writer http.ResponseWriter, request *http.Request) {
_, _ = writer.Write([]byte("hello world"))
})
s2 := server.NewServer("service2", "localhost:8082")
// 新建应用, 包含 service1, service2
app := server.NewApp([]*server.Server{s1, s2}, server.WithShutDownCallBack(StoreCacheToDBCallBack))
app.StartAndServer()
}
func StoreCacheToDBCallBack(ctx context.Context) {
done := make(chan struct{}, 1)
go func() {
// 这里将 cache 中数据刷到 db
log.Println("缓存刷新中...")
time.Sleep(time.Second)
done <- struct{}{}
}()
select {
case <-done:
log.Printf("缓存被刷新到了 DB")
case <-ctx.Done():
log.Printf("刷新缓存超时")
}
}
server.go
package server
import (
"context"
"errors"
"fmt"
"github.com/Ai-feier/sig"
"log"
"net/http"
"os"
"os/signal"
"sync"
"time"
)
type AppOption func(app *App)
// ShutdownCallBack App 关闭时的回调函数
// 默认超时 3s
// 用户可通过 context 自行控制
type ShutdownCallBack func(ctx context.Context)
// WithShutDownCallBack 向 App 传入 callback 函数
func WithShutDownCallBack(cbs ...ShutdownCallBack) AppOption {
return func(app *App) {
app.cbs = cbs
}
}
type App struct {
servers []*Server
// app 关闭的最长时间
shutdownTimeout time.Duration
// 留给 server 处理已有请求的时间
waitTime time.Duration
// 回调函数超时控制
cbTime time.Duration
cbs []ShutdownCallBack
}
// NewApp 新建 App
// 需用户传入 server
func NewApp(servers []*Server, opts ...AppOption) *App {
app := &App{
servers: servers,
shutdownTimeout: time.Second * 30,
waitTime: time.Second * 10,
cbTime: 3 * time.Second,
}
for _, opt := range opts {
opt(app)
}
return app
}
func (app *App) StartAndServer() {
// 启动服务
for _, s := range app.servers {
svc := s
go func() {
if err := svc.Start(); err != nil {
if errors.Is(err, http.ErrServerClosed) {
log.Printf("服务器%s已关闭", svc.name)
} else {
log.Printf("服务器%s异常退出", svc.name)
}
}
}()
}
log.Println("应用启动成功")
// app 启动成功
// 监听退出信号
// 监听两次信号, 第一次优雅终止, 第二次强行终止
ch := make(chan os.Signal, 2)
signal.Notify(ch, sig.Signals...)
<-ch
fmt.Println("应用开始关闭...")
go func() {
select {
case <-ch:
log.Println("强制退出")
os.Exit(1)
case <-time.After(app.shutdownTimeout):
log.Println("超时强行退出")
os.Exit(1)
}
}()
app.shutdown(context.Background())
}
func (app *App) shutdown(ctx context.Context) {
log.Println("app start to shutdown")
// 将 app 下所有 server 拒绝新请求
for _, svc := range app.servers {
svc.reject()
}
log.Println("等待已有请求处理")
// 这里可以改造为实时统计正在处理的请求数量,为0 则下一步
time.Sleep(time.Second * app.waitTime)
log.Println("开始关闭应用")
var wg sync.WaitGroup
wg.Add(len(app.servers))
for _, s := range app.servers {
svc := s
go func() {
if err := svc.stop(ctx);err != nil {
log.Println("服务器关闭失败", svc.name)
}
wg.Done()
}()
}
wg.Wait()
// 执行回调函数
wg.Add(len(app.cbs))
log.Println("开始执行回调函数")
for _, cb := range app.cbs {
c := cb
go func() {
ctx2, cancal := context.WithCancel(ctx)
c(ctx2)
cancal()
wg.Done()
}()
}
wg.Wait()
// 释放资源
log.Println("开始释放资源")
app.close()
}
func (app *App) close() {
// 可补充释放资源逻辑
time.Sleep(time.Second)
log.Println("应用关闭")
}
// Server 对应一个服务
type Server struct {
svc *http.Server
name string
mux *serverMux
}
// serverMux 请求锁
//装饰器模式
type serverMux struct {
reject bool
*http.ServeMux
}
func (s *serverMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.reject {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("服务器已关闭"))
return
}
s.ServeMux.ServeHTTP(w, r)
}
func NewServer(name, addr string) *Server {
mux := &serverMux{ServeMux: http.NewServeMux()}
return &Server{
name: name,
mux: mux,
svc: &http.Server{
Handler: mux,
Addr: addr,
},
}
}
func (s *Server) Start() error {
return s.svc.ListenAndServe()
}
func (s *Server) Handle(pattern string, handler http.HandlerFunc) {
s.mux.Handle(pattern, handler)
}
func (s *Server) reject() {
s.mux.reject = true
}
func (s *Server) stop(ctx context.Context) error {
log.Println("server: ", s.name, "关闭中")
return s.svc.Shutdown(ctx)
}
效果演示
C:\Users\26645\Desktop\goproject\gracefulshutdown\http>go build .
C:\Users\26645\Desktop\goproject\gracefulshutdown\http>http.exe
2024/01/02 15:32:08 应用启动成功
应用开始关闭...
2024/01/02 15:32:10 app start to shutdown
2024/01/02 15:32:10 等待已有请求处理
2024/01/02 15:32:10 开始关闭应用
2024/01/02 15:32:10 server: service2 关闭中
2024/01/02 15:32:10 server: service1 关闭中
2024/01/02 15:32:10 服务器service2已关闭
2024/01/02 15:32:10 服务器service1已关闭
2024/01/02 15:32:10 开始执行回调函数
2024/01/02 15:32:10 缓存刷新中...
2024/01/02 15:32:11 缓存被刷新到了 DB
2024/01/02 15:32:11 开始释放资源
2024/01/02 15:32:12 应用关闭