Skip to content

Commit

Permalink
TLS support for Hollywood (#105)
Browse files Browse the repository at this point in the history
* added max size for batching messages

* default message batch size

* wip: chaos.

* Add benchmark actor and protobuf message

The update introduces a benchmark actor that receives messages and increments a message counter. It also includes the creation of a new protobuf message. The main function is updated to incorporate this benchmark actor, while providing a detailed simulation of sending messages across multiple actors distributed across different engines.

* Refactor benchmark code and add profiling

The benchmark code is refactored for better error handling and message checks. 'benchmark' function is extracted from 'main' for further testing. A new Makefile target 'bench-profile' is added for profiling. A new test file 'main_test.go' is created for benchmark testing. Corresponding instructions are added in the newly created README.md file. Also, .gitignore is updated to exclude the created test, cpu, and memory profiles.

* document how to use the interactive web interface.

* add latency measurement as well.

* bah. can't make go test ignore the new latency tests. I just commented it out. Gonna revisit latency benchmarks later.

* Update benchmark command in Makefile

The benchmark command in the Makefile has been updated to run the whole package, not just main.go

* wip: add TLS option for the remote.

* wip: first test.

* fix the cert generation. test pass.

* adjust some timeouts as a test was flaking.

* finish up the test. ready to merge.

* docs.

* tweak flaky tests.

---------

Co-authored-by: anthdm <[email protected]>
  • Loading branch information
perbu and anthdm authored Dec 13, 2023
1 parent 4fe2643 commit 0c36559
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 27 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,30 @@ e.SpawnFunc(func(c *actor.Context) {
Actors can communicate with each other over the network with the Remote package.
This works the same as local actors but "over the wire". Hollywood supports serialization with protobuf.

### Configuration

remote.New() takes a remote.Config struct. This struct contains the following fields:
- ListenAddr string
- TlsConfig *tls.Config

You'll instantiate a new remote with the following code:
```go
var engine *actor.Engine
remote := remote.New(
remote.Config{
ListenAddr: "0.0.0.0:2222",
TlsConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
}
}
})
var err error
engine, err = actor.NewEngine(actor.EngineOptRemote(remote))

```



Look at the [Remote actor examples](examples/remote) and the [Chat client & Server](examples/chat) for more information.

## Eventstream
Expand Down
5 changes: 3 additions & 2 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,16 @@ func TestRequestResponse(t *testing.T) {
}
}, "actor_a")
t.Run("should timeout", func(t *testing.T) {
resp := e.Request(a, responseEvent{d: time.Millisecond * 2}, 1*time.Millisecond)
// a task with a 1us timeout which takes 20ms to complete, should always time out.
resp := e.Request(a, responseEvent{d: time.Millisecond * 20}, 1*time.Microsecond)
_, err := resp.Result()
assert.Error(t, err)
assert.Nil(t, e.Registry.get(resp.pid))

})
t.Run("should not timeout", func(t *testing.T) {
for i := 0; i < 200; i++ {
resp := e.Request(a, responseEvent{d: time.Microsecond * 1}, time.Millisecond*100)
resp := e.Request(a, responseEvent{d: time.Microsecond * 1}, time.Millisecond*800)
res, err := resp.Result()
assert.NoError(t, err)
assert.Equal(t, "foo", res)
Expand Down
16 changes: 13 additions & 3 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remote

import (
"context"
"crypto/tls"
"fmt"
"log/slog"
"net"
Expand All @@ -16,6 +17,7 @@ import (
// Config holds the remote configuration.
type Config struct {
ListenAddr string
TlsConfig *tls.Config
Wg *sync.WaitGroup
}

Expand Down Expand Up @@ -52,9 +54,17 @@ func (r *Remote) Start(e *actor.Engine) error {
}
r.state.Store(stateRunning)
r.engine = e
ln, err := net.Listen("tcp", r.config.ListenAddr)
var ln net.Listener
var err error
switch r.config.TlsConfig {
case nil:
ln, err = net.Listen("tcp", r.config.ListenAddr)
default:
slog.Debug("remote using TLS for listening")
ln, err = tls.Listen("tcp", r.config.ListenAddr, r.config.TlsConfig)
}
if err != nil {
panic("failed to listen: " + err.Error())
return fmt.Errorf("remote failed to listen: %w", err)
}
slog.Debug("listening", "addr", r.config.ListenAddr)
mux := drpcmux.New()
Expand All @@ -64,7 +74,7 @@ func (r *Remote) Start(e *actor.Engine) error {
}
s := drpcserver.New(mux)

r.streamRouterPID = r.engine.Spawn(newStreamRouter(r.engine), "router", actor.WithInboxSize(1024*1024))
r.streamRouterPID = r.engine.Spawn(newStreamRouter(r.engine, r.config.TlsConfig), "router", actor.WithInboxSize(1024*1024))
slog.Info("server started", "listenAddr", r.config.ListenAddr)
r.stopWg = &sync.WaitGroup{}
r.stopWg.Add(1)
Expand Down
11 changes: 1 addition & 10 deletions remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/stretchr/testify/require"
)

const (
debugLog = false // if you want a lot of noise when debugging the tests set this to true.
)

func init() {
// Needed for now when having the VTProtoserializer
RegisterType(&TestMessage{})
Expand Down Expand Up @@ -175,12 +171,7 @@ func makeRemoteEngine(listenAddr string) (*actor.Engine, *Remote, error) {
var e *actor.Engine
r := New(Config{ListenAddr: listenAddr})
var err error
switch debugLog {
case false:
e, err = actor.NewEngine(actor.EngineOptRemote(r))
case true:
e, err = actor.NewEngine(actor.EngineOptRemote(r))
}
e, err = actor.NewEngine(actor.EngineOptRemote(r))
if err != nil {
return nil, nil, fmt.Errorf("actor.NewEngine: %w", err)
}
Expand Down
180 changes: 180 additions & 0 deletions remote/remote_tls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package remote

import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"github.com/anthdm/hollywood/actor"
"github.com/stretchr/testify/assert"
"math/big"
"net"
"sync"
"testing"
"time"
)

type sharedConfig struct {
peer1Config *tls.Config
peer2Config *tls.Config
}

// TestSend_TLS tests sending messages between two remote engines using TLS on the loopback interface.
func TestSend_TLS(t *testing.T) {
const msgs = 10
tlsTestConfig, err := generateTLSConfig()
assert.NoError(t, err)
aAddr := getRandomLocalhostAddr()
a, ra, err := makeRemoteEngineTls(aAddr, tlsTestConfig.peer1Config)
assert.NoError(t, err)
bAddr := getRandomLocalhostAddr()
b, rb, err := makeRemoteEngineTls(bAddr, tlsTestConfig.peer2Config)
assert.NoError(t, err)
wg := &sync.WaitGroup{}

wg.Add(msgs) // send msgs messages
pida := a.SpawnFunc(func(c *actor.Context) {
switch msg := c.Message().(type) {
case *TestMessage:
assert.Equal(t, msg.Data, []byte("foo"))
wg.Done()
}
}, "actor on a")

for i := 0; i < msgs; i++ {
b.Send(pida, &TestMessage{Data: []byte("foo")})
}
wg.Add(msgs) // send msgs more messages
pidb := b.SpawnFunc(func(c *actor.Context) {
switch msg := c.Message().(type) {
case *TestMessage:
assert.Equal(t, msg.Data, []byte("foo"))
wg.Done()
}
}, "actor on b")
for i := 0; i < msgs; i++ {
a.Send(pidb, &TestMessage{Data: []byte("foo")})
}
wg.Wait() // wait for messages to be received by the actor.
ra.Stop().Wait() // shutdown the remotes
rb.Stop().Wait()
a.Poison(pida).Wait()
b.Poison(pidb).Wait()
}

func generateTLSConfig() (*sharedConfig, error) {
// Create a new ECDSA private key for CA
caPrivKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, fmt.Errorf("ecdsa.GenerateKey: %w", err)
}

// Create a CA certificate
ca := &x509.Certificate{
SerialNumber: big.NewInt(2023),
Subject: pkix.Name{
Organization: []string{"Hollywood Testing CA"},
},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(0, 0, 1),
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
IsCA: true,
}

caCertBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey)
if err != nil {
return nil, fmt.Errorf("x509.CreateCertificate: %w", err)
}

// Parse the CA certificate for inclusion in tls.Config
caCert, err := x509.ParseCertificate(caCertBytes)
if err != nil {
return nil, fmt.Errorf("x509.ParseCertificate: %w", err)
}
// Create the CertPool and add the CA certificate
caCertPool := x509.NewCertPool()
caCertPool.AddCert(caCert)

peer1Pair, err := generateCert(caCert, caPrivKey)
if err != nil {
return nil, fmt.Errorf("generateCert(peer1): %w", err)
}
peer1TlsConfig := &tls.Config{
Certificates: []tls.Certificate{*peer1Pair},
ClientCAs: caCertPool,
RootCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
}
peer2Pair, err := generateCert(caCert, caPrivKey)
if err != nil {
return nil, fmt.Errorf("generateCert(peer2): %w", err)
}
peer2TlsConfig := &tls.Config{
Certificates: []tls.Certificate{*peer2Pair},
ClientCAs: caCertPool,
RootCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
}

return &sharedConfig{peer1Config: peer1TlsConfig, peer2Config: peer2TlsConfig}, nil
}

// generateCert takes a CA, makes a new private key and certificate, and returns a tls.Certificate
func generateCert(ca *x509.Certificate, caKey *ecdsa.PrivateKey) (*tls.Certificate, error) {
// Create a new ECDSA private key for peer1
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, fmt.Errorf("ecdsa.GenerateKey: %w", err)
}
certificate := &x509.Certificate{
SerialNumber: big.NewInt(2),
Subject: pkix.Name{
CommonName: "localhost",
},
DNSNames: []string{"localhost"},
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(0, 0, 1),
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature,
}
certBytes, err := x509.CreateCertificate(rand.Reader, certificate, ca, &key.PublicKey, caKey)
if err != nil {
return nil, fmt.Errorf("x509.CreateCertificate: %w", err)
}
certPEM := pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: certBytes,
})

keyBytes, err := x509.MarshalECPrivateKey(key)
if err != nil {
return nil, fmt.Errorf("x509.MarshalECPrivateKey: %w", err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{
Type: "EC PRIVATE KEY",
Bytes: keyBytes,
})
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return nil, fmt.Errorf("tls.X509KeyPair: %w", err)
}
return &tlsCert, nil
}

func makeRemoteEngineTls(listenAddr string, config *tls.Config) (*actor.Engine, *Remote, error) {
var e *actor.Engine
r := New(Config{ListenAddr: listenAddr, TlsConfig: config})
var err error
e, err = actor.NewEngine(actor.EngineOptRemote(r))
if err != nil {
return nil, nil, fmt.Errorf("actor.NewEngine: %w", err)
}
return e, r, nil
}
15 changes: 9 additions & 6 deletions remote/stream_router.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remote

import (
"crypto/tls"
"log/slog"

"github.com/anthdm/hollywood/actor"
Expand All @@ -19,15 +20,17 @@ type terminateStream struct {
type streamRouter struct {
engine *actor.Engine
// streams is a map of remote address to stream writer pid.
streams map[string]*actor.PID
pid *actor.PID
streams map[string]*actor.PID
pid *actor.PID
tlsConfig *tls.Config
}

func newStreamRouter(e *actor.Engine) actor.Producer {
func newStreamRouter(e *actor.Engine, tlsConfig *tls.Config) actor.Producer {
return func() actor.Receiver {
return &streamRouter{
streams: make(map[string]*actor.PID),
engine: e,
streams: make(map[string]*actor.PID),
engine: e,
tlsConfig: tlsConfig,
}
}
}
Expand Down Expand Up @@ -61,7 +64,7 @@ func (s *streamRouter) deliverStream(msg *streamDeliver) {

swpid, ok = s.streams[address]
if !ok {
swpid = s.engine.SpawnProc(newStreamWriter(s.engine, s.pid, address))
swpid = s.engine.SpawnProc(newStreamWriter(s.engine, s.pid, address, s.tlsConfig))
s.streams[address] = swpid
}
s.engine.Send(swpid, msg)
Expand Down
28 changes: 22 additions & 6 deletions remote/stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remote

import (
"context"
"crypto/tls"
"errors"
"io"
"log/slog"
Expand All @@ -28,16 +29,18 @@ type streamWriter struct {
pid *actor.PID
inbox actor.Inboxer
serializer Serializer
tlsConfig *tls.Config
}

func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string) actor.Processer {
func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string, tlsConfig *tls.Config) actor.Processer {
return &streamWriter{
writeToAddr: address,
engine: e,
routerPID: rpid,
inbox: actor.NewInbox(streamWriterBatchSize),
pid: actor.NewPID(e.Address(), "stream", address),
serializer: ProtoSerializer{},
tlsConfig: tlsConfig,
}
}

Expand Down Expand Up @@ -112,11 +115,24 @@ func (s *streamWriter) init() {
delay time.Duration = time.Millisecond * 500
)
for {
rawconn, err = net.Dial("tcp", s.writeToAddr)
if err != nil {
slog.Error("net.Dial", "err", err, "remote", s.writeToAddr)
time.Sleep(delay)
continue
// Here we try to connect to the remote address.
// Todo: can we make an Event here in case of failure?
switch s.tlsConfig {
case nil:
rawconn, err = net.Dial("tcp", s.writeToAddr)
if err != nil {
slog.Error("net.Dial", "err", err, "remote", s.writeToAddr)
time.Sleep(delay)
continue
}
default:
slog.Debug("remote using TLS for writing")
rawconn, err = tls.Dial("tcp", s.writeToAddr, s.tlsConfig)
if err != nil {
slog.Error("tls.Dial", "err", err, "remote", s.writeToAddr)
time.Sleep(delay)
continue
}
}
break
}
Expand Down

0 comments on commit 0c36559

Please sign in to comment.