Skip to content

Commit

Permalink
External events and sessions storage.
Browse files Browse the repository at this point in the history
Updates #1755

Design
------

This commit adds support for pluggable events and
sessions recordings and adds several plugins.

In case if external sessions recording storage
is used, nodes or proxies depending on configuration
store the session recordings locally and
then upload the recordings in the background.

Non-print session events are always sent to the
remote auth server as usual.

In case if remote events storage is used, auth
servers download recordings from it during playbacks.

DynamoDB event backend
----------------------

Transient DynamoDB backend is added for events
storage. Events are stored with default TTL of 1 year.

External lambda functions should be used
to forward events from DynamoDB.

Parameter audit_table_name in storage section
turns on dynamodb backend.

The table will be auto created.

S3 sessions backend
-------------------

If audit_sessions_uri is specified to s3://bucket-name
node or proxy depending on recording mode
will start uploading the recorded sessions
to the bucket.

If the bucket does not exist, teleport will
attempt to create a bucket with versioning and encryption
turned on by default.

Teleport will turn on bucket-side encryption for the tarballs
using aws:kms key.

File sessions backend
---------------------

If audit_sessions_uri is specified to file:///folder
teleport will start writing tarballs to this folder instead
of sending records to the file server.

This is helpful for plugin writers who can use fuse or NFS
mounted storage to handle the data.

Working dynamic configuration.
  • Loading branch information
klizhentas committed Mar 15, 2018
1 parent de18a22 commit bad1b04
Show file tree
Hide file tree
Showing 91 changed files with 42,072 additions and 136 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ const (
// ComponentSession is an active session.
ComponentSession = "session"

// ComponentDynamoDB represents dynamodb clients
ComponentDynamoDB = "dynamodb"

// DebugEnvVar tells tests to use verbose debug output
DebugEnvVar = "DEBUG"

Expand Down Expand Up @@ -209,6 +212,16 @@ const (

// Off means mode is off
Off = "off"

// SchemeS3 is S3 file scheme, means upload or download to S3 like object
// storage
SchemeS3 = "s3"

// SchemeFile is a local disk file storage
SchemeFile = "file"

// LogsDir is a log subdirectory for events and logs
LogsDir = "log"
)

// Component generates "component:subcomponent1:subcomponent2" strings used
Expand Down
2 changes: 1 addition & 1 deletion e
Submodule e updated from cfd6b0 to 3b7ad4
13 changes: 11 additions & 2 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/gravitational/teleport/lib/backend/dir"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/services"
Expand Down Expand Up @@ -70,6 +71,9 @@ type TeleInstance struct {
// Nodes is a list of additional nodes
// started with this instance
Nodes []*service.TeleportProcess

// UploadEventsC is a channel for upload events
UploadEventsC chan *events.UploadEvent
}

type User struct {
Expand Down Expand Up @@ -177,8 +181,9 @@ func NewInstance(cfg InstanceConfig) *TeleInstance {
fatalIf(err)

i := &TeleInstance{
Ports: cfg.Ports,
Hostname: cfg.NodeName,
Ports: cfg.Ports,
Hostname: cfg.NodeName,
UploadEventsC: make(chan *events.UploadEvent, 100),
}
secrets := InstanceSecrets{
SiteName: cfg.ClusterName,
Expand Down Expand Up @@ -326,6 +331,7 @@ func (i *TeleInstance) CreateEx(trustedSecrets []*InstanceSecrets, tconf *servic
tconf = service.MakeDefaultConfig()
}
tconf.DataDir = dataDir
tconf.UploadEventsC = i.UploadEventsC
tconf.Auth.ClusterName, err = services.NewClusterName(services.ClusterNameSpecV2{
ClusterName: i.Secrets.SiteName,
})
Expand Down Expand Up @@ -454,6 +460,7 @@ func (i *TeleInstance) StartNode(name string, sshPort int) (*service.TeleportPro
tconf.HostUUID = name
tconf.Hostname = name
tconf.DataDir = dataDir
tconf.UploadEventsC = i.UploadEventsC
var ttl time.Duration
tconf.CachePolicy = service.CachePolicy{
Enabled: true,
Expand Down Expand Up @@ -507,6 +514,7 @@ func (i *TeleInstance) StartNodeAndProxy(name string, sshPort, proxyWebPort, pro
tconf.Token = "token"
tconf.HostUUID = name
tconf.Hostname = name
tconf.UploadEventsC = i.UploadEventsC
tconf.DataDir = dataDir
var ttl time.Duration
tconf.CachePolicy = service.CachePolicy{
Expand Down Expand Up @@ -576,6 +584,7 @@ func (i *TeleInstance) StartProxy(cfg ProxyConfig) error {
tconf.AuthServers = append(tconf.AuthServers, *authServer)
tconf.CachePolicy = service.CachePolicy{Enabled: true}
tconf.DataDir = dataDir
tconf.UploadEventsC = i.UploadEventsC
tconf.HostUUID = cfg.Name
tconf.Hostname = cfg.Name
tconf.Token = "token"
Expand Down
51 changes: 43 additions & 8 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2016 Gravitational, Inc.
Copyright 2016-2018 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -162,23 +162,37 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
var tests = []struct {
inRecordLocation string
inForwardAgent bool
auditSessionsURI string
}{

// normal teleport
{
services.RecordAtNode,
false,
inRecordLocation: services.RecordAtNode,
inForwardAgent: false,
},
// recording proxy
{
services.RecordAtProxy,
true,
inRecordLocation: services.RecordAtProxy,
inForwardAgent: true,
},
// normal teleport with upload to file server
{
inRecordLocation: services.RecordAtNode,
inForwardAgent: false,
auditSessionsURI: c.MkDir(),
},
{
inRecordLocation: services.RecordAtProxy,
inForwardAgent: false,
auditSessionsURI: c.MkDir(),
},
}

for _, tt := range tests {
makeConfig := func() (*check.C, []string, []*InstanceSecrets, *service.Config) {
clusterConfig, err := services.NewClusterConfig(services.ClusterConfigSpecV3{
SessionRecording: tt.inRecordLocation,
Audit: services.AuditConfig{AuditSessionsURI: tt.auditSessionsURI},
})
c.Assert(err, check.IsNil)

Expand All @@ -189,7 +203,6 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
tconf.Proxy.DisableWebService = true
tconf.Proxy.DisableWebInterface = true
tconf.SSH.Enabled = true

return c, nil, nil, tconf
}
t := s.newTeleportWithConfig(makeConfig())
Expand Down Expand Up @@ -285,7 +298,29 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
myTerm.Type("\aecho hi\n\r\aexit\n\r\a")

// wait for session to end:
<-endC
select {
case <-endC:
case <-time.After(10 * time.Second):
c.Fatalf("Timeout waiting for session to finish")
}

// wait for the upload of the right session to complete
if tt.auditSessionsURI != "" {
timeoutC := time.After(10 * time.Second)
loop:
for {
select {
case event := <-t.UploadEventsC:
if event.SessionID != string(session.ID) {
log.Debugf("Skipping mismatching session %v, expecting upload of %v.", event.SessionID, session.ID)
continue
}
break loop
case <-timeoutC:
c.Fatalf("Timeout waiting for upload of session %v to complete to %v", session.ID, tt.auditSessionsURI)
}
}
}

// read back the entire session (we have to try several times until we get back
// everything because the session is closing)
Expand Down Expand Up @@ -323,7 +358,7 @@ func (s *IntSuite) TestAuditOn(c *check.C) {
select {
case <-tickCh:
// Get all session events from the backend.
sessionEvents, err := site.GetSessionEvents(defaults.Namespace, session.ID, 0)
sessionEvents, err := site.GetSessionEvents(defaults.Namespace, session.ID, 0, false)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
7 changes: 6 additions & 1 deletion lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,7 +1883,12 @@ func (s *APIServer) getSessionEvents(auth ClientI, w http.ResponseWriter, r *htt
if err != nil {
afterN = 0
}
return auth.GetSessionEvents(namespace, *sid, afterN)
includePrintEvents, err := strconv.ParseBool(r.URL.Query().Get("print"))
if err != nil {
includePrintEvents = false
}

return auth.GetSessionEvents(namespace, *sid, afterN, includePrintEvents)
}

type upsertNamespaceReq struct {
Expand Down
4 changes: 2 additions & 2 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,12 +808,12 @@ func (a *AuthWithRoles) GetSessionChunk(namespace string, sid session.ID, offset
return a.alog.GetSessionChunk(namespace, sid, offsetBytes, maxBytes)
}

func (a *AuthWithRoles) GetSessionEvents(namespace string, sid session.ID, afterN int) ([]events.EventFields, error) {
func (a *AuthWithRoles) GetSessionEvents(namespace string, sid session.ID, afterN int, includePrintEvents bool) ([]events.EventFields, error) {
if err := a.action(namespace, services.KindSession, services.VerbRead); err != nil {
return nil, trace.Wrap(err)
}

return a.alog.GetSessionEvents(namespace, sid, afterN)
return a.alog.GetSessionEvents(namespace, sid, afterN, includePrintEvents)
}

func (a *AuthWithRoles) SearchEvents(from, to time.Time, query string, limit int) ([]events.EventFields, error) {
Expand Down
5 changes: 4 additions & 1 deletion lib/auth/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,14 +1686,17 @@ func (c *Client) GetSessionChunk(namespace string, sid session.ID, offsetBytes,
//
// This function is usually used in conjunction with GetSessionReader to
// replay recorded session streams.
func (c *Client) GetSessionEvents(namespace string, sid session.ID, afterN int) (retval []events.EventFields, err error) {
func (c *Client) GetSessionEvents(namespace string, sid session.ID, afterN int, includePrintEvents bool) (retval []events.EventFields, err error) {
if namespace == "" {
return nil, trace.BadParameter(MissingNamespaceError)
}
query := make(url.Values)
if afterN > 0 {
query.Set("after", strconv.Itoa(afterN))
}
if includePrintEvents {
query.Set("print", fmt.Sprintf("%v", includePrintEvents))
}
response, err := c.Get(c.Endpoint("namespaces", namespace, "sessions", string(sid), "events"), query)
if err != nil {
return nil, trace.Wrap(err)
Expand Down
2 changes: 1 addition & 1 deletion lib/auth/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (s *TLSSuite) TestSharedSessions(c *check.C) {
})
c.Assert(err, check.IsNil)
// ask for strictly session events:
e, err := clt.GetSessionEvents(defaults.Namespace, sess.ID, 0)
e, err := clt.GetSessionEvents(defaults.Namespace, sess.ID, 0, true)
c.Assert(err, check.IsNil)
c.Assert(len(e), check.Equals, 2)
c.Assert(e[0].GetString("val"), check.Equals, "one")
Expand Down
48 changes: 13 additions & 35 deletions lib/backend/dir/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"os"
"path"
"path/filepath"
"syscall"
"time"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/trace"

log "github.com/sirupsen/logrus"
"github.com/gravitational/teleport/lib/utils"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -161,10 +160,10 @@ func (bk *Backend) CreateVal(bucket []string, key string, val []byte, ttl time.D
return trace.ConvertSystemError(err)
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
if err := f.Truncate(0); err != nil {
return trace.ConvertSystemError(err)
}
Expand All @@ -175,27 +174,6 @@ func (bk *Backend) CreateVal(bucket []string, key string, val []byte, ttl time.D
return trace.Wrap(bk.applyTTL(dirPath, key, ttl))
}

func writeLock(f *os.File) error {
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}

func readLock(f *os.File) error {
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_SH); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}

func unlock(f *os.File) error {
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN); err != nil {
return trace.ConvertSystemError(err)
}
return nil
}

// UpsertVal updates or inserts value with a given TTL into a bucket
// ForeverTTL for no TTL
func (bk *Backend) UpsertVal(bucket []string, key string, val []byte, ttl time.Duration) error {
Expand All @@ -214,10 +192,10 @@ func (bk *Backend) UpsertVal(bucket []string, key string, val []byte, ttl time.D
return trace.ConvertSystemError(err)
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
if err := f.Truncate(0); err != nil {
return trace.ConvertSystemError(err)
}
Expand Down Expand Up @@ -249,10 +227,10 @@ func (bk *Backend) GetVal(bucket []string, key string) ([]byte, error) {
return nil, trace.ConvertSystemError(err)
}
defer f.Close()
if err := readLock(f); err != nil {
if err := utils.FSReadLock(f); err != nil {
return nil, trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
bytes, err := ioutil.ReadAll(f)
if err != nil {
return nil, trace.ConvertSystemError(err)
Expand All @@ -278,10 +256,10 @@ func (bk *Backend) DeleteKey(bucket []string, key string) error {
return trace.ConvertSystemError(err)
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
if err := os.Remove(bk.ttlFile(dirPath, key)); err != nil {
if !os.IsNotExist(err) {
log.Warn(err)
Expand Down Expand Up @@ -345,10 +323,10 @@ func removeFile(path string) error {
return nil
}
defer f.Close()
if err := writeLock(f); err != nil {
if err := utils.FSWriteLock(f); err != nil {
return trace.Wrap(err)
}
defer unlock(f)
defer utils.FSUnlock(f)
err = os.Remove(path)
if err != nil {
err = trace.ConvertSystemError(err)
Expand Down
Loading

0 comments on commit bad1b04

Please sign in to comment.