Change expiration to use time.Duration, add Redis cache tests.
This commit is contained in:
parent
705515630d
commit
e803483d92
|
@ -15,14 +15,14 @@ import (
|
|||
|
||||
// redisCache stores necessary values for Redis cache
|
||||
type redisStore struct {
|
||||
authExpiration int64
|
||||
aclExpiration int64
|
||||
authExpiration time.Duration
|
||||
aclExpiration time.Duration
|
||||
client bes.RedisClient
|
||||
}
|
||||
|
||||
type goStore struct {
|
||||
authExpiration int64
|
||||
aclExpiration int64
|
||||
authExpiration time.Duration
|
||||
aclExpiration time.Duration
|
||||
client *goCache.Cache
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ type Store interface {
|
|||
}
|
||||
|
||||
// NewGoStore initializes a cache using go-cache as the store.
|
||||
func NewGoStore(authExpiration, aclExpiration int64) *goStore {
|
||||
func NewGoStore(authExpiration, aclExpiration time.Duration) *goStore {
|
||||
// TODO: support hydrating the cache to retain previous values.
|
||||
|
||||
return &goStore{
|
||||
|
@ -51,7 +51,7 @@ func NewGoStore(authExpiration, aclExpiration int64) *goStore {
|
|||
}
|
||||
|
||||
// NewSingleRedisStore initializes a cache using a single Redis instance as the store.
|
||||
func NewSingleRedisStore(host, port, password string, db int, authExpiration, aclExpiration int64) *redisStore {
|
||||
func NewSingleRedisStore(host, port, password string, db int, authExpiration, aclExpiration time.Duration) *redisStore {
|
||||
addr := fmt.Sprintf("%s:%s", host, port)
|
||||
redisClient := goredis.NewClient(&goredis.Options{
|
||||
Addr: addr,
|
||||
|
@ -67,7 +67,7 @@ func NewSingleRedisStore(host, port, password string, db int, authExpiration, ac
|
|||
}
|
||||
|
||||
// NewSingleRedisStore initializes a cache using a Redis Cluster as the store.
|
||||
func NewRedisClusterStore(password string, addresses []string, authExpiration, aclExpiration int64) *redisStore {
|
||||
func NewRedisClusterStore(password string, addresses []string, authExpiration, aclExpiration time.Duration) *redisStore {
|
||||
clusterClient := goredis.NewClusterClient(
|
||||
&goredis.ClusterOptions{
|
||||
Addrs: addresses,
|
||||
|
@ -116,7 +116,7 @@ func (s *redisStore) Connect(ctx context.Context, reset bool) bool {
|
|||
log.Errorf("couldn't start redis. error: %s", err)
|
||||
return false
|
||||
} else {
|
||||
log.Infoln("started redis cachet")
|
||||
log.Infoln("started redis cache")
|
||||
//Check if cache must be reset
|
||||
if reset {
|
||||
s.client.FlushDB(ctx)
|
||||
|
@ -146,7 +146,7 @@ func (s *goStore) CheckACLRecord(ctx context.Context, username, topic, clientid
|
|||
return s.checkRecord(ctx, record, s.aclExpiration)
|
||||
}
|
||||
|
||||
func (s *goStore) checkRecord(ctx context.Context, record string, expirationTime int64) (bool, bool) {
|
||||
func (s *goStore) checkRecord(ctx context.Context, record string, expirationTime time.Duration) (bool, bool) {
|
||||
granted := false
|
||||
v, present := s.client.Get(record)
|
||||
|
||||
|
@ -156,7 +156,7 @@ func (s *goStore) checkRecord(ctx context.Context, record string, expirationTime
|
|||
granted = true
|
||||
}
|
||||
|
||||
s.client.Set(record, value, time.Second*time.Duration(expirationTime))
|
||||
s.client.Set(record, value, expirationTime)
|
||||
}
|
||||
return present, granted
|
||||
}
|
||||
|
@ -173,7 +173,7 @@ func (s *redisStore) CheckACLRecord(ctx context.Context, username, topic, client
|
|||
return s.checkRecord(ctx, record, s.aclExpiration)
|
||||
}
|
||||
|
||||
func (s *redisStore) checkRecord(ctx context.Context, record string, expirationTime int64) (bool, bool) {
|
||||
func (s *redisStore) checkRecord(ctx context.Context, record string, expirationTime time.Duration) (bool, bool) {
|
||||
|
||||
present, granted, err := s.getAndRefresh(ctx, record, expirationTime)
|
||||
if err == nil {
|
||||
|
@ -198,14 +198,14 @@ func (s *redisStore) checkRecord(ctx context.Context, record string, expirationT
|
|||
return present, granted
|
||||
}
|
||||
|
||||
func (s *redisStore) getAndRefresh(ctx context.Context, record string, expirationTime int64) (bool, bool, error) {
|
||||
func (s *redisStore) getAndRefresh(ctx context.Context, record string, expirationTime time.Duration) (bool, bool, error) {
|
||||
val, err := s.client.Get(ctx, record).Result()
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
||||
//refresh expiration
|
||||
_, err = s.client.Expire(ctx, record, time.Duration(expirationTime)*time.Second).Result()
|
||||
_, err = s.client.Expire(ctx, record, expirationTime).Result()
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
@ -245,7 +245,7 @@ func (s *redisStore) SetACLRecord(ctx context.Context, username, topic, clientid
|
|||
return s.setRecord(ctx, record, granted, s.authExpiration)
|
||||
}
|
||||
|
||||
func (s *redisStore) setRecord(ctx context.Context, record, granted string, expirationTime int64) error {
|
||||
func (s *redisStore) setRecord(ctx context.Context, record, granted string, expirationTime time.Duration) error {
|
||||
err := s.set(ctx, record, granted, expirationTime)
|
||||
|
||||
if err == nil {
|
||||
|
@ -266,6 +266,6 @@ func (s *redisStore) setRecord(ctx context.Context, record, granted string, expi
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *redisStore) set(ctx context.Context, record string, granted string, expirationTime int64) error {
|
||||
return s.client.Set(ctx, record, granted, time.Duration(expirationTime)*time.Second).Err()
|
||||
func (s *redisStore) set(ctx context.Context, record string, granted string, expirationTime time.Duration) error {
|
||||
return s.client.Set(ctx, record, granted, expirationTime).Err()
|
||||
}
|
||||
|
|
|
@ -9,14 +9,14 @@ import (
|
|||
)
|
||||
|
||||
func TestGoStore(t *testing.T) {
|
||||
authSeconds := int64(1)
|
||||
aclSeconds := int64(1)
|
||||
store := NewGoStore(authSeconds, aclSeconds)
|
||||
authExpiration := 100 * time.Millisecond
|
||||
aclExpiration := 100 * time.Millisecond
|
||||
store := NewGoStore(authExpiration, aclExpiration)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
assert.Equal(t, authSeconds, store.authExpiration)
|
||||
assert.Equal(t, aclSeconds, store.aclExpiration)
|
||||
assert.Equal(t, authExpiration, store.authExpiration)
|
||||
assert.Equal(t, aclExpiration, store.aclExpiration)
|
||||
|
||||
assert.True(t, store.Connect(ctx, false))
|
||||
|
||||
|
@ -35,7 +35,7 @@ func TestGoStore(t *testing.T) {
|
|||
assert.True(t, granted)
|
||||
|
||||
// Wait for it to expire.
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
present, granted = store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
|
@ -51,7 +51,7 @@ func TestGoStore(t *testing.T) {
|
|||
assert.True(t, granted)
|
||||
|
||||
// Wait for it to expire.
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
present, granted = store.CheckACLRecord(ctx, username, password, topic, acc)
|
||||
|
||||
|
@ -68,7 +68,7 @@ func TestGoStore(t *testing.T) {
|
|||
assert.False(t, granted)
|
||||
|
||||
// Wait for it to expire.
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
present, granted = store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
|
@ -84,7 +84,7 @@ func TestGoStore(t *testing.T) {
|
|||
assert.False(t, granted)
|
||||
|
||||
// Wait for it to expire.
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
present, granted = store.CheckACLRecord(ctx, username, password, topic, acc)
|
||||
|
||||
|
@ -92,3 +92,127 @@ func TestGoStore(t *testing.T) {
|
|||
assert.False(t, granted)
|
||||
|
||||
}
|
||||
|
||||
func TestRedisSingleStore(t *testing.T) {
|
||||
authExpiration := 1000 * time.Millisecond
|
||||
aclExpiration := 1000 * time.Millisecond
|
||||
store := NewSingleRedisStore("localhost", "6379", "", 3, authExpiration, aclExpiration)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
assert.Equal(t, authExpiration, store.authExpiration)
|
||||
assert.Equal(t, aclExpiration, store.aclExpiration)
|
||||
|
||||
assert.True(t, store.Connect(ctx, false))
|
||||
|
||||
username := "test-user"
|
||||
password := "test-password"
|
||||
topic := "test/topic"
|
||||
acc := 1
|
||||
|
||||
// Test granted access.
|
||||
err := store.SetAuthRecord(ctx, username, password, "true")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted := store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.True(t, granted)
|
||||
|
||||
// Wait for it to expire. For Redis we do this just once since the package used (or Redis itself, not sure) doesn't
|
||||
// support less than 1s expiration times: "specified duration is 100ms, but minimal supported value is 1s"
|
||||
time.Sleep(1050 * time.Millisecond)
|
||||
|
||||
present, granted = store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
assert.False(t, present)
|
||||
assert.False(t, granted)
|
||||
|
||||
err = store.SetACLRecord(ctx, username, password, topic, acc, "true")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted = store.CheckACLRecord(ctx, username, password, topic, acc)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.True(t, granted)
|
||||
|
||||
// Test not granted access.
|
||||
err = store.SetAuthRecord(ctx, username, password, "false")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted = store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.False(t, granted)
|
||||
|
||||
err = store.SetACLRecord(ctx, username, password, topic, acc, "false")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted = store.CheckACLRecord(ctx, username, password, topic, acc)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.False(t, granted)
|
||||
}
|
||||
|
||||
func TestRedisClusterStore(t *testing.T) {
|
||||
authExpiration := 1000 * time.Millisecond
|
||||
aclExpiration := 1000 * time.Millisecond
|
||||
|
||||
addresses := []string{"localhost:7000", "localhost:7001", "localhost:7002"}
|
||||
store := NewRedisClusterStore("", addresses, authExpiration, aclExpiration)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
assert.Equal(t, authExpiration, store.authExpiration)
|
||||
assert.Equal(t, aclExpiration, store.aclExpiration)
|
||||
|
||||
assert.True(t, store.Connect(ctx, false))
|
||||
|
||||
username := "test-user"
|
||||
password := "test-password"
|
||||
topic := "test/topic"
|
||||
acc := 1
|
||||
|
||||
// Test granted access.
|
||||
err := store.SetAuthRecord(ctx, username, password, "true")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted := store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.True(t, granted)
|
||||
|
||||
// Wait for it to expire. For Redis we do this just once since the package used (or Redis itself, not sure) doesn't
|
||||
// support less than 1s expiration times: "specified duration is 100ms, but minimal supported value is 1s"
|
||||
time.Sleep(1050 * time.Millisecond)
|
||||
|
||||
present, granted = store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
assert.False(t, present)
|
||||
assert.False(t, granted)
|
||||
|
||||
err = store.SetACLRecord(ctx, username, password, topic, acc, "true")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted = store.CheckACLRecord(ctx, username, password, topic, acc)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.True(t, granted)
|
||||
|
||||
// Test not granted access.
|
||||
err = store.SetAuthRecord(ctx, username, password, "false")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted = store.CheckAuthRecord(ctx, username, password)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.False(t, granted)
|
||||
|
||||
err = store.SetACLRecord(ctx, username, password, topic, acc, "false")
|
||||
assert.Nil(t, err)
|
||||
|
||||
present, granted = store.CheckACLRecord(ctx, username, password, topic, acc)
|
||||
|
||||
assert.True(t, present)
|
||||
assert.False(t, granted)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"plugin"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
bes "github.com/iegomez/mosquitto-go-auth/backends"
|
||||
"github.com/iegomez/mosquitto-go-auth/cache"
|
||||
|
@ -424,7 +425,7 @@ func setCache(authOpts map[string]string) {
|
|||
addresses[i] = strings.TrimSpace(addresses[i])
|
||||
}
|
||||
|
||||
authPlugin.cache = cache.NewRedisClusterStore(password, addresses, authCacheSeconds, aclCacheSeconds)
|
||||
authPlugin.cache = cache.NewRedisClusterStore(password, addresses, time.Duration(authCacheSeconds)*time.Second, time.Duration(aclCacheSeconds)*time.Second)
|
||||
|
||||
} else {
|
||||
if cacheHost, ok := authOpts["cache_host"]; ok {
|
||||
|
@ -444,11 +445,11 @@ func setCache(authOpts map[string]string) {
|
|||
}
|
||||
}
|
||||
|
||||
authPlugin.cache = cache.NewSingleRedisStore(host, port, password, db, authCacheSeconds, aclCacheSeconds)
|
||||
authPlugin.cache = cache.NewSingleRedisStore(host, port, password, db, time.Duration(authCacheSeconds)*time.Second, time.Duration(aclCacheSeconds)*time.Second)
|
||||
}
|
||||
|
||||
default:
|
||||
authPlugin.cache = cache.NewGoStore(authCacheSeconds, aclCacheSeconds)
|
||||
authPlugin.cache = cache.NewGoStore(time.Duration(authCacheSeconds)*time.Second, time.Duration(aclCacheSeconds)*time.Second)
|
||||
}
|
||||
|
||||
if !authPlugin.cache.Connect(authPlugin.ctx, reset) {
|
||||
|
|
Loading…
Reference in New Issue