Like errgroup/waitgroup, but only runs a maximum of tasks at any time.

  • By Fatih Arslan
  • Last update: Nov 25, 2022
  • Comments: 3

semgroup PkgGoDev

semgroup provides synchronization and error propagation, for groups of goroutines working on subtasks of a common task. It uses a weighted semaphore implementation to make sure that only a number of maximum tasks can be run at any time.

Unlike golang.org/x/sync/errgroup, it doesn't return the first non-nil error, rather it accumulates all errors and returns a set of errors, allowing each task to fullfil their task.

Install

go get github.com/fatih/semgroup

Example

With no errors:

package main

import (
	"context"
	"fmt"

	"github.com/fatih/semgroup"
)

func main() {
	const maxWorkers = 2
	s := semgroup.NewGroup(context.Background(), maxWorkers)

	visitors := []int{5, 2, 10, 8, 9, 3, 1}

	for _, v := range visitors {
		v := v

		s.Go(func() error {
			fmt.Println("Visits: ", v)
			return nil
		})
	}

	// Wait for all visits to complete. Any errors are accumulated.
	if err := s.Wait(); err != nil {
		fmt.Println(err)
	}

	// Output:
	// Visits: 2
	// Visits: 10
	// Visits: 8
	// Visits: 9
	// Visits: 3
	// Visits: 1
	// Visits: 5
}

With errors:

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/fatih/semgroup"
)

func main() {
	const maxWorkers = 2
	s := semgroup.NewGroup(context.Background(), maxWorkers)

	visitors := []int{1, 1, 1, 1, 2, 2, 1, 1, 2}

	for _, v := range visitors {
		v := v

		s.Go(func() error {
			if v != 1 {
				return errors.New("only one visitor is allowed")
			}
			return nil
		})
	}

	// Wait for all visits to complete. Any errors are accumulated.
	if err := s.Wait(); err != nil {
		fmt.Println(err)
	}

	// Output:
	// 3 error(s) occurred:
	// * only one visitor is allowed
	// * only one visitor is allowed
	// * only one visitor is allowed
}

Download

semgroup.zip

Comments(3)

  • 1

    Increased the code coverage from 92.6% to 96%

    Why?

    There was less code coverage because of one if condition that was never reached.

    What?

    Was trying to add the test cases but getting deadlocks like the https://github.com/fatih/semgroup/pull/2

    How?

    • Just moved the multiple if condition to OR operator
    • Fixed a small spelling mistake.
  • 2

    Fix deadlock

    A deadlock can occur if Acquire fails as wg.Done will not be called in the goroutine but we called wg.Add before Acquier. Link to related lines.

    This can be eliminated by either moving wg.Add just before creating goroutine or by calling wg.Done also before returning error. I am not sure which one is better?

    Another question: Should we append error returned from g.Go to multiError?

    By the way sorry for questions in PR but i thought it would be better to send test code with the questions so asking from PR instead of an issue.

  • 3

    Implements error methods

    Implements Is(target error) and As(target interface{}) so that errors.Is and errors.As can examine errors by passing err which is returned from semgroup.Wait().