grace/grace.go

322 lines
8.0 KiB
Go
Raw Normal View History

2012-06-05 06:21:10 +08:00
// Package grace allows for gracefully waiting for a listener to
// finish serving it's active requests.
package grace
import (
"errors"
"fmt"
"net"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
2012-06-05 06:21:10 +08:00
)
var (
// This error is returned by Inherits() when we're not inheriting any fds.
2014-04-03 02:52:43 +08:00
ErrNotInheriting = errors.New("grace: no inherited listeners")
2012-06-05 06:21:10 +08:00
// This error is returned by Listener.Accept() when Close is in progress.
2014-04-03 02:52:43 +08:00
ErrAlreadyClosed = errors.New("grace: already closed")
errRestartListeners = errors.New("grace: restart must be given listeners")
errTermTimeout = errors.New("grace: TERM timeout in closing listeners")
// Time in the past to trigger immediate deadline.
2013-10-08 13:57:59 +08:00
timeInPast = time.Now()
// Test if init activated by checking ppid on startup since we will get
// re-parented once the old parent is killed and we will end up looking like
// we're init started.
initStarted = os.Getppid() == 1
2012-06-05 06:21:10 +08:00
)
const (
// Used to indicate a graceful restart in the new process.
2013-10-07 03:14:11 +08:00
envCountKey = "LISTEN_FDS"
envCountKeyPrefix = envCountKey + "="
2012-06-05 06:21:10 +08:00
// The error returned by the standard library when the socket is closed.
errClosed = "use of closed network connection"
)
// A Listener providing a graceful Close process and can be sent
// across processes using the underlying File descriptor.
type Listener interface {
2013-03-26 05:39:26 +08:00
net.Listener
// Will return the underlying file representing this Listener.
File() (f *os.File, err error)
}
2012-06-05 06:21:10 +08:00
type listener struct {
Listener
closed bool
closedMutex sync.RWMutex
wg sync.WaitGroup
2012-06-05 06:21:10 +08:00
}
type deadliner interface {
SetDeadline(t time.Time) error
}
2012-06-05 06:21:10 +08:00
// Allows for us to notice when the connection is closed.
type conn struct {
net.Conn
2013-11-04 02:22:18 +08:00
wg *sync.WaitGroup
once sync.Once
2012-06-05 06:21:10 +08:00
}
2013-11-04 02:41:56 +08:00
func (c *conn) Close() error {
2013-11-04 19:57:07 +08:00
defer c.once.Do(c.wg.Done)
2013-10-07 01:54:16 +08:00
return c.Conn.Close()
2012-06-05 06:21:10 +08:00
}
2012-06-05 12:56:51 +08:00
// Wraps an existing File listener to provide a graceful Close() process.
func NewListener(l Listener) Listener {
return &listener{Listener: l}
2012-06-05 06:21:10 +08:00
}
func (l *listener) Close() error {
l.closedMutex.Lock()
l.closed = true
l.closedMutex.Unlock()
var err error
// Init provided sockets dont actually close so we trigger Accept to return
// by setting the deadline.
if initStarted {
if ld, ok := l.Listener.(deadliner); ok {
2013-10-07 02:07:31 +08:00
err = ld.SetDeadline(timeInPast)
} else {
fmt.Fprintln(os.Stderr, "init activated server did not have SetDeadline")
}
} else {
err = l.Listener.Close()
}
l.wg.Wait()
return err
2012-06-05 06:21:10 +08:00
}
func (l *listener) Accept() (net.Conn, error) {
// Presume we'll accept and decrement in defer if we don't. If we did this
// after a successful accept we would have a race condition where we may end
// up incorrectly shutting down between the time we do a successful accept
// and the increment.
var c net.Conn
l.wg.Add(1)
defer func() {
// If we didn't accept, we decrement our presumptuous count above.
if c == nil {
l.wg.Done()
}
}()
l.closedMutex.RLock()
if l.closed {
l.closedMutex.RUnlock()
return nil, ErrAlreadyClosed
}
l.closedMutex.RUnlock()
c, err := l.Listener.Accept()
if err != nil {
if strings.HasSuffix(err.Error(), errClosed) {
return nil, ErrAlreadyClosed
}
// We use SetDeadline above to trigger Accept to return when we're trying
// to handoff to a child as part of our restart process. In this scenario
2013-08-21 07:48:43 +08:00
// we want to treat the timeout the same as a Close.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
l.closedMutex.RLock()
if l.closed {
l.closedMutex.RUnlock()
return nil, ErrAlreadyClosed
}
l.closedMutex.RUnlock()
}
return nil, err
}
2013-11-04 02:41:56 +08:00
return &conn{Conn: c, wg: &l.wg}, nil
2012-06-05 06:21:10 +08:00
}
2014-04-03 02:52:43 +08:00
// Process configures the restart process.
type Process struct {
2014-04-03 02:52:43 +08:00
// TermTimeout if set will determine how long we'll wait for listeners when
// we're sent the TERM signal.
TermTimeout time.Duration
}
func (p *Process) term(listeners []Listener) error {
// shutdown all listeners in parallel
errs := make(chan error, len(listeners))
wg := sync.WaitGroup{}
wg.Add(len(listeners))
for _, l := range listeners {
go func(l Listener) {
defer wg.Done()
if err := l.Close(); err != nil {
errs <- err
}
}(l)
}
if p.TermTimeout.Nanoseconds() == 0 {
// no timeout, wait indefinitely
wg.Wait()
} else {
// wait in background to allow for implementing a timeout
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
// wait for graceful termination or timeout
select {
case <-done:
case <-time.After(p.TermTimeout):
return errTermTimeout
}
}
// if any errors occurred, return the first one
if len(errs) > 0 {
return <-errs
}
return nil
}
2012-06-05 12:56:51 +08:00
// Wait for signals to gracefully terminate or restart the process.
2014-04-03 02:52:43 +08:00
func (p *Process) Wait(listeners []Listener) error {
2012-06-05 06:21:10 +08:00
ch := make(chan os.Signal, 2)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2)
for {
sig := <-ch
switch sig {
case syscall.SIGTERM:
2014-04-03 02:52:43 +08:00
// this ensures a subsequent TERM will trigger standard go behaviour of
// terminating.
signal.Stop(ch)
2014-04-03 02:52:43 +08:00
return p.term(listeners)
2012-06-05 06:21:10 +08:00
case syscall.SIGUSR2:
2014-04-03 02:52:43 +08:00
// we only return here if there's an error, otherwise the new process
// will send us a TERM when it's ready to trigger the actual shutdown.
if err := p.Restart(listeners); err != nil {
return err
2012-06-05 06:21:10 +08:00
}
}
}
}
2012-06-05 12:56:51 +08:00
// Try to inherit listeners from the parent process.
func (p *Process) Inherit() (listeners []Listener, err error) {
2012-06-05 06:21:10 +08:00
countStr := os.Getenv(envCountKey)
if countStr == "" {
return nil, ErrNotInheriting
}
count, err := strconv.Atoi(countStr)
if err != nil {
return nil, err
}
// If we are inheriting, the listeners will begin at fd 3
for i := 3; i < 3+count; i++ {
file := os.NewFile(uintptr(i), "listener")
tmp, err := net.FileListener(file)
file.Close()
if err != nil {
return nil, err
}
2013-10-07 02:25:24 +08:00
l := tmp.(Listener)
2012-06-05 06:21:10 +08:00
listeners = append(listeners, NewListener(l))
}
return
}
2012-06-05 12:56:51 +08:00
// Start the Close process in the parent. This does not wait for the
// parent to close and simply sends it the TERM signal.
func (p *Process) CloseParent() error {
ppid := os.Getppid()
if ppid == 1 { // init provided sockets, for example systemd
return nil
}
return syscall.Kill(ppid, syscall.SIGTERM)
2012-06-05 06:21:10 +08:00
}
2012-06-05 12:56:51 +08:00
// Restart the process passing the given listeners to the new process.
func (p *Process) Restart(listeners []Listener) (err error) {
2012-06-05 06:21:10 +08:00
if len(listeners) == 0 {
2014-04-03 02:52:43 +08:00
return errRestartListeners
2012-06-05 06:21:10 +08:00
}
2013-10-07 04:01:58 +08:00
// Extract the fds from the listeners.
2012-06-05 06:21:10 +08:00
files := make([]*os.File, len(listeners))
for i, l := range listeners {
files[i], err = l.File()
if err != nil {
return err
}
defer files[i].Close()
syscall.CloseOnExec(int(files[i].Fd()))
}
2013-10-07 04:01:58 +08:00
// Use the original binary location. This works with symlinks such that if
// the file it points to has been changed we will use the updated symlink.
2012-06-05 06:21:10 +08:00
argv0, err := exec.LookPath(os.Args[0])
if err != nil {
return err
}
2013-10-07 04:01:58 +08:00
// In order to keep the working directory the same as when we started.
2012-06-05 06:21:10 +08:00
wd, err := os.Getwd()
if err != nil {
return err
}
2013-10-07 03:14:11 +08:00
2013-10-07 04:01:58 +08:00
// Pass on the environment and replace the old count key with the new one.
2013-10-07 03:14:11 +08:00
var env []string
for _, v := range os.Environ() {
if !strings.HasPrefix(v, envCountKeyPrefix) {
env = append(env, v)
}
}
env = append(env, fmt.Sprintf("%s%d", envCountKeyPrefix, len(listeners)))
2012-06-05 06:21:10 +08:00
allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...)
_, err = os.StartProcess(argv0, os.Args, &os.ProcAttr{
Dir: wd,
2013-10-07 03:14:11 +08:00
Env: env,
2012-06-05 06:21:10 +08:00
Files: allFiles,
})
return err
}
var defaultProcess = &Process{}
// Wait for signals to gracefully terminate or restart the process.
func Wait(listeners []Listener) (err error) {
return defaultProcess.Wait(listeners)
}
// Try to inherit listeners from the parent process.
func Inherit() (listeners []Listener, err error) {
return defaultProcess.Inherit()
}
// Start the Close process in the parent. This does not wait for the
// parent to close and simply sends it the TERM signal.
func CloseParent() error {
return defaultProcess.CloseParent()
}
// Restart the process passing the given listeners to the new process.
func Restart(listeners []Listener) (err error) {
return defaultProcess.Restart(listeners)
}