Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

一周一package(2024版本) #172

Open
lzh2nix opened this issue Jul 15, 2024 · 3 comments
Open

一周一package(2024版本) #172

lzh2nix opened this issue Jul 15, 2024 · 3 comments

Comments

@lzh2nix
Copy link
Owner

lzh2nix commented Jul 15, 2024

Content

@lzh2nix lzh2nix changed the title 一周一package(v2024) 一周一package(2024版本) Jul 15, 2024
@lzh2nix
Copy link
Owner Author

lzh2nix commented Jul 27, 2024

001 failsafe-go(2024W30)

https://github.com/failsafe-go/failsafe-go

熔断, 限流,降级 可谓是后端高可用的三件套, 故障复盘基本都会归结到这三点+告警缺失. failsafe-go 是个把相关组件封装的比较好的packege, 主要提供了以下几个package.

推荐的执行顺序是:

image

其也符合高可用的目标: 熔断—>重试 —> 降级. 从上面的结构可以看出这里的关键是最内层的func执行完之后基于一定的policy去做上层的执行, 具体 Policy 的接口如下:

type FailurePolicyBuilder[S any, R any] interface {
	// HandleErrors specifies the errors to handle as failures. Any errors that evaluate to true for errors.Is and the
	// execution error will be handled.
	HandleErrors(errors ...error) S

	// HandleResult specifies the results to handle as failures. Any result that evaluates to true for reflect.DeepEqual and
	// the execution result will be handled. This method is only considered when a result is returned from an execution, not
	// when an error is returned.
	HandleResult(result R) S

	// HandleIf specifies that a failure has occurred if the predicate matches the execution result or error.
	HandleIf(predicate func(R, error) bool) S

	// OnSuccess registers the listener to be called when the policy determines an execution attempt was a success.
	OnSuccess(listener func(ExecutionEvent[R])) S

	// OnFailure registers the listener to be called when the policy determines an execution attempt was a failure, and may
	// be handled.
	OnFailure(listener func(ExecutionEvent[R])) S
}

通过定制特定的policy 可以实现很灵活的定制. 下面就对每种模式做一个简单的整理.

Retry

重试这里最关键的一点是避免无脑的定时重试, 这种很容易演变成ddos 攻击. 在retry policy 可以通过

WithMaxRetries, WithBackoff 来指定最大重试次数和重试策略(这里比较推荐的是backoff算法).

// Retry on ErrConnecting up to 3 times with a 1 second delay between attempts
retryPolicy := retrypolicy.Builder[Connection]().
  HandleErrors(ErrConnecting).
  WithDelay(time.Second).
  WithMaxRetries(3).
  Build()
  
// Get with retries
connection, err := failsafe.Get(Connect, retryPolicy)

Circuit Breaker

熔断的目的就是在系统超过负载(已经返回了错误)的时候在请求侧做一层保护,避免系统进一步的恶化, failsafe-go 提供了两种模式的熔断 time basedcounter based. 当最近请求失败超过阈值进入熔断器 Opened 的状态, 再过一段时间之后进入half-open 状态, 这种状态下会处理一部分请求, 如果继续成功就进入closed 状态(全量放行), 如果在half-open 状态检测到失败就继续回到open 状态.

Closed——> Opened —delay—> Half-opened

// Opens after 5 failures, half-opens after 1 minute, closes after 2 successes
breaker := circuitbreaker.Builder[any]().
  HandleErrors(ErrSending).
  WithFailureThreshold(5).
  WithDelay(time.Minute).
  WithSuccessThreshold(2).
  Build()
  
// Run with circuit breaking
err := failsafe.Run(SendMessage, breaker)

failsafe-go 也是提供了比较灵活的配置:

// 最近5个请求,3个失败进入open 状态
builder.WithFailureThresholdRatio(3, 5)

// 一分钟内有三个失败进入open 状态
builder.WithFailureThresholdPeriod(3, time.Minute)

// 1分钟内 20个请求, 其中5个失败, 进入open 状态
builder.WithFailureRateThreshold(20, 5, time.Minute)

// Half-open 状态, 持续30s
builder.WithDelay(30*time.Second)

//在half-open 状态 进入close 状态的配置, 否则再次进入open
builder.WithSuccessThreshold(5)
builder.WithSuccessThresholdRatio(3, 5)

Rate Limiter

熔断是检测到异常之后出于保护系统的目的俩降低请求数, 限流的目的则是避免系统进入overload 状态. failsafe-go 提供了两种模式的ratelimit, smooth 模式和bursty 模式.

// 1s 100个请求, 也就是10ms 1个请求, 保证100个请求在1s内相对平滑, 10ms 之内超过1个则进行限制
limiter := ratelimiter.Smooth(100, time.Second)

// 1s 最多处理10个请求(可以是瞬时的), 超过10个则限制
limiter := ratelimiter.Bursty(10, time.Second)

默认情况下达到ratelimit 之后会直接返回 ratelimiter.ErrExceeded, 不过你也可以配置一个最大wait间隔, 使其后面有机会执行:

// Wait up to 1 second for execution permission
builder.WithMaxWaitTime(time.Second)

TimeOut

timeout 机制基本就是context 的Timeout机制差不多,不同点在用 failsafe里你可以和其他机制打配合.

// Timeout after 10 seconds
timeout := timeout.With[any](10*time.Second)
err := failsafe.Run(Connect, timeout)

Fallback

在有主备的地方使用Fallback 就比较合适, 主挂了, 使用备

connection, err := failsafe.Get(Connect, fallback)

Hedge

类似赛马机制, 同时放n个请求到后端, 以第一个返回左右结果, 其他的request 直接cancel掉, 这种模式虽然会提高相应速度但是会增加后端整体的负载, 在使用时还需谨慎.

// Hedge up to 2 times with a 1 second delay between attempts
hedgePolicy := hedgepolicy.BuilderWithDelay[any](time.Second).
  WithMaxHedges(2).
  Build()
  
// Run with hedges
err := failsafe.Run(SendRequest, hedgePolicy)

Bulkhead

并发控制, 尤其是go func() xxx 只需要控制在10个 goroutine的场景

// Permit 10 concurrent executions
bulkhead := bulkhead.With[any](10)
err := failsafe.Run(SendRequest, bulkhead)

// 默认超过10个之后会立马返回错误(ErrFull), 当然这里可以设置wait time
bulkhead := bulkhead.Builder[any](10).
  WithMaxWaitTime(time.Second).
  Build()

这里maxWaitTimeout 也是很容易遇到坑, for loop 限制 go routine 并发的数量多场景还是推荐其他package.

Cache

不是特别推荐, 有更好的package.

总结

整理来看除 Cache, Bulkhead 之外的几个机制都可以尝试. 整体代码使用范型, 通用性也比较强.

@lzh2nix
Copy link
Owner Author

lzh2nix commented Aug 10, 2024

002 conc(2024W32)

https://github.com/sourcegraph/conc

golang 确实是入门很快, 精通很难的语言. 尤其是在并发场景下很容易写出go routine泄漏的代码, souregraph 的这个库的目标也是和这个相关:

  • Not cleaning up resources correctly.
  • Causing deadlocks.
  • Crashing the entire program because of a panic in a single goroutine.

这三点老手写代码的时候也是很容易犯错, 通过一个 package 来规范这些也是日常所需.
conc在处理这块儿代码确实很简洁(少即是多, 减少犯错的空间):

image

通过pool来控制并发就更方便了:

func fetchLastNames_pool(ctx context.Context, firstNames []string) ([]string, error) {
	p := pool.NewWithResults[string]().WithContext(ctx)
	for _, firstName := range firstNames {
		firstName := firstName
		p.Go(func(ctx context.Context) (string, error) {
			return fetchLastName(ctx, firstName)
		})
	}
	return p.Wait()
}

使用Iter 就更丝滑:

func fetchLastNames2(ctx context.Context, firstNames []string) ([]string, error) {
	return iter.MapErr(firstNames, func(firstName *string) (string, error) {
		return fetchLastName(ctx, *firstName)
	})
}

不用太多冗余的代码(控制写入的并发控制), 也可以通过自定义MapIter的方式控制并发数量:

input := []int{1, 2, 3, 4}
mapper := Mapper[int, bool]{
	MaxGoroutines: len(input) / 2,
}

results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)

另外也提供了通过stream模式来请求数据(异步网络请求+尽快将结果显示给用户的场景):

func streamFileContents(ctx context.Context, fileNames <-chan string, fileContents chan<- string) {
	s := stream.New()
	for fileName := range fileNames {
		fileName := fileName
		s.Go(func() stream.Callback {
			contents := fetchFileContents(ctx, fileName)
			return func() { fileContents <- contents }
		})
	}
	s.Wait()
}

总结:
pool, iter, stream 基本能满足go routine的并发控制

@lzh2nix
Copy link
Owner Author

lzh2nix commented Aug 19, 2024

003 otter(2024W33)

https://github.com/maypok86/otter

缓存作为一种提高性能的利器, 在各种性能优化场合都是广泛提及. 也是有各种实现:

缓存淘汰方面基本上也是依赖LRU做缓存淘汰策略. 而LRU基本都是通过一个双链表+map 实现, 其在多读的情况性能相对比较差(缓存命时移动到表头, 也伴随着锁):

image

于是有了基于FIFO的各种改进(ARC,2Q,LIRS, TinyLFU), s3-fifo 基于观测数据(大部份缓存数据实际上只访问了一次)重新设计的一种缓存淘汰策略. 将这部分key快速从缓存中淘汰掉.

image

通过FIFO 避免了链表的复杂操作, 元素在插入时先放small FIFO这样可以快速淘汰(基于之前的统计72%的元素只访问一次).

otter 就是其中一个实现, 其在75% read, 25% writer 的场景下性能很很多:

image

在接口层面使用也是比较简单(构建/读写):

 package main

import (
    "fmt"
    "time"

    "github.com/maypok86/otter"
)

func main() {
    // create a cache with capacity equal to 10000 elements
    cache, err := otter.MustBuilder[string, string](10_000).
        CollectStats().
        Cost(func(key string, value string) uint32 {
            return 1
        }).
        WithTTL(time.Hour).
        Build()
    if err != nil {
        panic(err)
    }

    // set item with ttl (1 hour) 
    cache.Set("key", "value")

    // get value from cache
    value, ok := cache.Get("key")
    if !ok {
        panic("not found key")
    }
    fmt.Println(value)

    // delete item from cache
    cache.Delete("key")

    // delete data and stop goroutines
    cache.Close()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant