fix racy behavior with closing channels

replaces custom channel logic with simpler RWMutex and WaitGroup
This commit is contained in:
Naitik Shah 2013-05-08 23:23:45 -07:00
parent 8f7f9df910
commit 6bda16a267

View File

@ -29,10 +29,6 @@ const (
// The error returned by the standard library when the socket is closed. // The error returned by the standard library when the socket is closed.
errClosed = "use of closed network connection" errClosed = "use of closed network connection"
// Used for the counter chan.
inc = true
dec = false
) )
// A FileListener is a file backed net.Listener. // A FileListener is a file backed net.Listener.
@ -56,66 +52,35 @@ type Listener interface {
CloseRequest() CloseRequest()
} }
// A goroutine based counter that provides graceful Close for listeners.
type listener struct { type listener struct {
FileListener FileListener
closeRequest chan bool // Send a bool here to indicate we want to Close. closed bool
allClosed chan bool // Receive from here will indicate a clean Close. closedMutex sync.RWMutex
counter chan bool // Use the inc/dec counters. wg sync.WaitGroup
} }
// Allows for us to notice when the connection is closed. // Allows for us to notice when the connection is closed.
type conn struct { type conn struct {
net.Conn net.Conn
counter chan bool wg *sync.WaitGroup
} }
func (c conn) Close() error { func (c conn) Close() error {
c.counter <- dec err := c.Conn.Close()
return c.Conn.Close() c.wg.Done()
return err
} }
// Wraps an existing File listener to provide a graceful Close() process. // Wraps an existing File listener to provide a graceful Close() process.
func NewListener(l FileListener) Listener { func NewListener(l FileListener) Listener {
i := &listener{ return &listener{FileListener: l}
FileListener: l,
closeRequest: make(chan bool),
allClosed: make(chan bool),
counter: make(chan bool),
}
go i.enabler()
return i
}
func (l *listener) enabler() {
var counter uint64
var change bool
for {
select {
case <-l.closeRequest:
l.closeRequest = nil
case change = <-l.counter:
if change == inc {
counter++
} else {
counter--
}
}
if l.closeRequest == nil && counter == 0 {
close(l.allClosed)
close(l.counter)
break
}
}
} }
func (l *listener) CloseRequest() { func (l *listener) CloseRequest() {
select { l.closedMutex.Lock()
case l.closeRequest <- true: l.closed = true
<-l.allClosed l.closedMutex.Unlock()
case <-l.allClosed: l.wg.Wait()
return
}
} }
func (l *listener) Close() error { func (l *listener) Close() error {
@ -124,29 +89,21 @@ func (l *listener) Close() error {
} }
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
select { c, err := l.FileListener.Accept()
case <-l.allClosed: if err != nil {
return nil, ErrAlreadyClosed if strings.HasSuffix(err.Error(), errClosed) {
default:
c, err := l.FileListener.Accept()
if err != nil {
if strings.HasSuffix(err.Error(), errClosed) {
return nil, ErrAlreadyClosed
}
return nil, err
}
select {
case <-l.allClosed:
c.Close()
return nil, ErrAlreadyClosed return nil, ErrAlreadyClosed
case l.counter <- inc:
return conn{
Conn: c,
counter: l.counter,
}, nil
} }
return nil, err
} }
panic("not reached") l.closedMutex.RLock()
defer l.closedMutex.RUnlock()
if l.closed {
c.Close()
return nil, ErrAlreadyClosed
}
l.wg.Add(1)
return conn{Conn: c, wg: &l.wg}, nil
} }
// Wait for signals to gracefully terminate or restart the process. // Wait for signals to gracefully terminate or restart the process.