clean repo

This commit is contained in:
Naitik Shah 2012-06-04 22:21:10 +00:00
commit a98138a3e6
5 changed files with 648 additions and 0 deletions

217
grace.go Normal file
View File

@ -0,0 +1,217 @@
// Package grace allows for gracefully waiting for a listener to
// finish serving it's active requests.
package grace
import (
"errors"
"fmt"
"net"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
)
var (
// This error is returned by Inherits() when we're not inheriting any fds.
ErrNotInheriting = errors.New("no inherited listeners")
// This error is returned by Listener.Accept() when Close is in progress.
ErrAlreadyClosed = errors.New("already closed")
)
const (
// Used to indicate a graceful restart in the new process.
envCountKey = "GRACE"
// The error returned by the standard library when the socket is closed.
errClosed = "use of closed network connection"
// Used for the counter chan.
inc = true
dec = false
)
// A Listener providing a graceful Close process and can be sent
// across processes using the underlying File descriptor.
type Listener interface {
net.Listener
// Will return the underlying file representing this Listener.
File() (f *os.File, err error)
}
// A goroutine based counter that provides graceful Close for listeners.
type listener struct {
Listener
closed bool // Indicates we're already closed.
closeRequest chan bool // Send a bool here to indicate we want to Close.
allClosed chan bool // Receive from here will indicate a clean Close.
counter chan bool // Use the inc/dec counters.
}
// Allows for us to notice when the connection is closed.
type conn struct {
net.Conn
counter chan bool
}
func (c conn) Close() error {
c.counter <- dec
return c.Conn.Close()
}
// Create a new listener.
func NewListener(l Listener) Listener {
i := &listener{
Listener: l,
closeRequest: make(chan bool),
allClosed: make(chan bool),
counter: make(chan bool),
}
go i.enabler()
return i
}
func (l *listener) enabler() {
var counter uint64
var change bool
for {
select {
case <-l.closeRequest:
l.closed = true
case change = <-l.counter:
if change == inc {
counter++
} else {
counter--
}
}
if l.closed && counter == 0 {
l.allClosed <- true
break
}
}
}
func (l *listener) Close() error {
if l.closed == true {
return nil
}
l.closeRequest <- true
<-l.allClosed
return l.Listener.Close()
}
func (l *listener) Accept() (net.Conn, error) {
if l.closed == true {
return nil, ErrAlreadyClosed
}
c, err := l.Listener.Accept()
if err != nil {
if strings.HasSuffix(err.Error(), errClosed) {
return nil, ErrAlreadyClosed
}
return nil, err
}
l.counter <- inc
return conn{
Conn: c,
counter: l.counter,
}, nil
}
// Wait for signals.
func Wait(listeners []Listener) (err error) {
ch := make(chan os.Signal, 2)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2)
for {
sig := <-ch
switch sig {
case syscall.SIGTERM:
var wg sync.WaitGroup
wg.Add(len(listeners))
for _, l := range listeners {
go func(l Listener) {
cErr := l.Close()
if cErr != nil {
err = cErr
}
wg.Done()
}(l)
}
wg.Wait()
return
case syscall.SIGUSR2:
rErr := Restart(listeners)
if rErr != nil {
return rErr
}
}
}
panic("not reached")
}
// Try to inherit listeners from environment variables.
func Inherit() (listeners []Listener, err error) {
countStr := os.Getenv(envCountKey)
if countStr == "" {
return nil, ErrNotInheriting
}
count, err := strconv.Atoi(countStr)
if err != nil {
return nil, err
}
// If we are inheriting, the listeners will begin at fd 3
for i := 3; i < 3+count; i++ {
file := os.NewFile(uintptr(i), "listener")
tmp, err := net.FileListener(file)
file.Close()
if err != nil {
return nil, err
}
l := tmp.(*net.TCPListener)
listeners = append(listeners, NewListener(l))
}
return
}
// Start the Close process in the parent.
func CloseParent() error {
return syscall.Kill(os.Getppid(), syscall.SIGTERM)
}
// Restart the process passing it the given listeners.
func Restart(listeners []Listener) (err error) {
if len(listeners) == 0 {
return errors.New("restart must be given listeners.")
}
files := make([]*os.File, len(listeners))
for i, l := range listeners {
files[i], err = l.File()
if err != nil {
return err
}
defer files[i].Close()
syscall.CloseOnExec(int(files[i].Fd()))
}
argv0, err := exec.LookPath(os.Args[0])
if err != nil {
return err
}
wd, err := os.Getwd()
if err != nil {
return err
}
allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...)
allFiles = append(allFiles, nil)
_, err = os.StartProcess(argv0, os.Args, &os.ProcAttr{
Dir: wd,
Env: append(os.Environ(), fmt.Sprintf("%s=%d", envCountKey, len(files))),
Files: allFiles,
})
return err
}

44
gracedemo/demo.go Normal file
View File

@ -0,0 +1,44 @@
// Command gracedemo implements a demo server showing how to gracefully
// terminate an HTTP server using finmonitor and goagain.
package main
import (
"flag"
"fmt"
"github.com/nshah/go.grace/gracehttp"
"net/http"
"time"
)
var (
address0 = flag.String("a0", ":48567", "Zero address to bind to.")
address1 = flag.String("a1", ":48568", "First address to bind to.")
address2 = flag.String("a2", ":48569", "Second address to bind to.")
now = time.Now()
)
func main() {
flag.Parse()
gracehttp.Serve(
gracehttp.Handler{*address0, newHandler("Zero ")},
gracehttp.Handler{*address1, newHandler("First ")},
gracehttp.Handler{*address2, newHandler("Second")},
)
}
func newHandler(name string) http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/sleep/", func(w http.ResponseWriter, r *http.Request) {
duration, err := time.ParseDuration(r.FormValue("duration"))
if err != nil {
http.Error(w, err.Error(), 400)
}
time.Sleep(duration)
w.Write([]byte(fmt.Sprintf(
"%s started at %s slept for %d nanoseconds.\n",
name,
now,
duration.Nanoseconds())))
})
return mux
}

113
gracehttp/http.go Normal file
View File

@ -0,0 +1,113 @@
// Package gracehttp provides easy to use graceful restart
// functionality for HTTP server.
package gracehttp
import (
"bytes"
"errors"
"flag"
"fmt"
"github.com/nshah/go.grace"
"log"
"net"
"net/http"
"os"
)
type Handler struct {
Addr string
Handler http.Handler
}
type Handlers []Handler
var (
verbose = flag.Bool("gracehttp.log", true, "Enable logging.")
ErrUnexpectedListenersCount = errors.New("unexpected listeners count")
)
// Creates new listeners for all the given addresses.
func (handlers Handlers) newListeners() ([]grace.Listener, error) {
listeners := make([]grace.Listener, len(handlers))
for index, pair := range handlers {
addr, err := net.ResolveTCPAddr("tcp", pair.Addr)
if err != nil {
return nil, fmt.Errorf(
"Failed net.ResolveTCPAddr for %s: %s", pair.Addr, err)
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, fmt.Errorf("Failed net.ListenTCP for %s: %s", pair.Addr, err)
}
listeners[index] = grace.NewListener(l)
}
return listeners, nil
}
// Serve on the given listeners.
func (handlers Handlers) serve(listeners []grace.Listener) error {
if len(handlers) != len(listeners) {
return ErrUnexpectedListenersCount
}
for i, l := range listeners {
go func(i int, l net.Listener) {
err := http.Serve(l, handlers[i].Handler)
// The underlying Accept() will return grace.ErrAlreadyClosed
// when a signal to do the same is returned, which we are okay with.
if err != nil && err != grace.ErrAlreadyClosed {
log.Fatalf("Failed http.Serve: %s", err)
}
}(i, l)
}
// TODO errors should be returned not fataled
return nil
}
// Serve will listen on the given address. It will also wait for a
// SIGUSR2 signal and will restart the server passing the active listener
// to the new process and avoid dropping active connections.
func Serve(givenHandlers ...Handler) {
handlers := Handlers(givenHandlers)
listeners, err := grace.Inherit()
if err == nil {
if *verbose {
log.Printf(
"Graceful handoff of %s with new pid %d and old pid %d.",
pprintAddr(listeners), os.Getpid(), os.Getppid())
}
err = grace.CloseParent()
if err != nil {
log.Fatalf("Failed to close parent: %s", err)
}
} else if err == grace.ErrNotInheriting {
listeners, err = handlers.newListeners()
if err != nil {
log.Fatal(err)
}
if *verbose {
log.Printf("Serving %s with pid %d.", pprintAddr(listeners), os.Getpid())
}
} else {
log.Fatalf("Failed graceful handoff: %s", err)
}
go handlers.serve(listeners)
err = grace.Wait(listeners)
if err != nil {
log.Fatalf("Failed grace.Wait: %s", err)
}
if *verbose {
log.Printf("Exiting pid %d.", os.Getpid())
}
}
// Used for pretty printing addresses.
func pprintAddr(listeners []grace.Listener) []byte {
out := bytes.NewBuffer(nil)
for i, l := range listeners {
if i != 0 {
fmt.Fprint(out, ", ")
}
fmt.Fprint(out, l.Addr())
}
return out.Bytes()
}

219
gracehttp/http_test.go Normal file
View File

@ -0,0 +1,219 @@
package gracehttp_test
import (
"bufio"
"encoding/json"
"fmt"
"github.com/nshah/go.freeport"
"github.com/nshah/go.tool"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"testing"
"time"
)
const (
// The amount of time we give a process to become ready.
processWait = time.Second * 2
// The amount of time for the long HTTP request. This should be
// bigger than the value above.
slowHttpWait = time.Second * 4
)
// The response from the test server.
type response struct {
Sleep time.Duration
Pid int
}
// State for the test run.
type harness struct {
T *testing.T // The test instance.
ImportPath string // The import path for the server command.
ExeName string // The temp binary from the build.
Addr []string // The addresses for the http servers.
Process []*os.Process // The server commands, oldest to newest.
RequestWaitGroup sync.WaitGroup // The wait group for the HTTP requests.
newProcess chan bool // A bool is sent on restart.
}
// Find 3 free ports and setup addresses.
func (h *harness) SetupAddr() {
for i := 3; i > 0; i-- {
port, err := freeport.Get()
if err != nil {
h.T.Fatalf("Failed to find a free port: %s", err)
}
h.Addr = append(h.Addr, fmt.Sprintf("127.0.0.1:%d", port))
}
}
// Builds the command line arguments.
func (h *harness) Args() []string {
if h.Addr == nil {
h.SetupAddr()
}
return []string{
"-gracehttp.log=false",
"-a0", h.Addr[0],
"-a1", h.Addr[1],
"-a2", h.Addr[2],
}
}
// Builds the command.
func (h *harness) Build() {
basename := filepath.Base(h.ImportPath)
tempFile, err := ioutil.TempFile("", basename+"-")
if err != nil {
h.T.Fatalf("Error creating temp file: %s", err)
}
h.ExeName = tempFile.Name()
_ = os.Remove(h.ExeName) // the build tool will create this
options := tool.Options{
ImportPaths: []string{h.ImportPath},
Output: h.ExeName,
}
_, err = options.Command("build")
if err != nil {
h.T.Fatal(err)
}
}
// Start a fresh server and wait for pid updates on restart.
func (h *harness) Start() {
if h.newProcess == nil {
h.newProcess = make(chan bool)
}
cmd := exec.Command(h.ExeName, h.Args()...)
stderr, err := cmd.StderrPipe()
go func() {
reader := bufio.NewReader(stderr)
for {
line, isPrefix, err := reader.ReadLine()
if err != nil {
h.T.Fatalf("Failed to read line from server process: %s", err)
}
if isPrefix {
h.T.Fatalf("Deal with isPrefix for line: %s", line)
}
res := &response{}
err = json.Unmarshal([]byte(line), res)
if err != nil {
h.T.Fatalf("Could not parse json from stderr %s: %s", line, err)
}
process, err := os.FindProcess(res.Pid)
if err != nil {
h.T.Fatalf("Could not find process with pid: %d", res.Pid)
}
h.Process = append(h.Process, process)
h.newProcess <- true
}
}()
err = cmd.Start()
if err != nil {
h.T.Fatalf("Failed to start command: %s", err)
}
h.Process = append(h.Process, cmd.Process)
<-h.newProcess
time.Sleep(processWait)
}
// Restart the most recent server.
func (h *harness) Restart() {
err := h.MostRecentProcess().Signal(syscall.SIGUSR2)
if err != nil {
h.T.Fatalf("Failed to send SIGUSR2 and restart process: %s", err)
}
<-h.newProcess
time.Sleep(processWait)
}
// Graceful termination of the most recent server.
func (h *harness) Stop() {
err := h.MostRecentProcess().Signal(syscall.SIGTERM)
if err != nil {
h.T.Fatalf("Failed to send SIGTERM and stop process: %s", err)
}
}
// Returns the most recent server process.
func (h *harness) MostRecentProcess() *os.Process {
l := len(h.Process)
if l == 0 {
h.T.Fatalf("Most recent command requested before command was created.")
}
return h.Process[l-1]
}
// Remove the built executable.
func (h *harness) RemoveExe() {
err := os.Remove(h.ExeName)
if err != nil {
h.T.Fatalf("Failed to RemoveExe: %s", err)
}
}
// Helper for sending a single request.
func (h *harness) SendOne(duration time.Duration, addr string, pid int) {
client := &http.Client{
Transport: &http.Transport{DisableKeepAlives: true},
}
url := fmt.Sprintf("http://%s/sleep/?duration=%s", addr, duration.String())
r, err := client.Get(url)
if err != nil {
h.T.Fatalf("Failed request to %s: %s", url, err)
}
defer r.Body.Close()
res := &response{}
err = json.NewDecoder(r.Body).Decode(res)
if err != nil {
h.T.Fatalf("Failed to ready decode json response body: %s", err)
}
if pid != res.Pid {
h.T.Fatalf("Didn't get expected pid %d instead got %d", pid, res.Pid)
}
h.RequestWaitGroup.Done()
}
// Send test HTTP request.
func (h *harness) SendRequest() {
pid := h.MostRecentProcess().Pid
for _, addr := range h.Addr {
h.RequestWaitGroup.Add(2)
go h.SendOne(time.Second*0, addr, pid)
go h.SendOne(slowHttpWait, addr, pid)
}
}
// Wait for everything.
func (h *harness) Wait() {
h.RequestWaitGroup.Wait()
}
// The main test case.
func TestComplex(t *testing.T) {
h := &harness{
ImportPath: "github.com/nshah/go.grace/gracehttp/testserver",
T: t,
}
h.Build()
h.Start()
h.SendRequest()
time.Sleep(processWait)
h.Restart()
h.SendRequest()
time.Sleep(processWait)
h.Restart()
h.SendRequest()
time.Sleep(processWait)
h.Stop()
h.Wait()
h.RemoveExe()
}

View File

@ -0,0 +1,55 @@
// Command testserver implements a test case.
package main
import (
"encoding/json"
"flag"
"github.com/nshah/go.grace/gracehttp"
"log"
"net/http"
"os"
"time"
)
type response struct {
Sleep time.Duration
Pid int
}
var (
address0 = flag.String("a0", ":48567", "Zero address to bind to.")
address1 = flag.String("a1", ":48568", "First address to bind to.")
address2 = flag.String("a2", ":48569", "Second address to bind to.")
)
func main() {
flag.Parse()
err := json.NewEncoder(os.Stderr).Encode(&response{Pid: os.Getpid()})
if err != nil {
log.Fatalf("Error writing startup json: %s", err)
}
gracehttp.Serve(
gracehttp.Handler{*address0, newHandler()},
gracehttp.Handler{*address1, newHandler()},
gracehttp.Handler{*address2, newHandler()},
)
}
func newHandler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/sleep/", func(w http.ResponseWriter, r *http.Request) {
duration, err := time.ParseDuration(r.FormValue("duration"))
if err != nil {
http.Error(w, err.Error(), 400)
}
time.Sleep(duration)
err = json.NewEncoder(w).Encode(&response{
Sleep: duration,
Pid: os.Getpid(),
})
if err != nil {
log.Fatalf("Error encoding json: %s", err)
}
})
return mux
}