跳至主要內容

go 泛型尝鲜,实现一个流式处理库

pedrogaogogogenerics大约 13 分钟

go 泛型尝鲜,实现一个流式处理库

背景

熟悉 Java、Rust 的同学,肯定都非常喜欢其 Stream API,以 Java 为例,筛选一个简单的数字列表,可以有如下实现:

boolean ok = Stream.of(1, 2, 3, 4, 5).
            map(item -> item + 100).
            filter(item -> item > 102).
            allMatch(item -> item > 102);

Streamopen in new window API 能让我们以链式调用和 函数式编程的方式来处理数据,数据在函数之间流转、映射,最后汇总得到结果。

Stream 是流式处理的一种实现,Stream 是数据的通道,简称数据流,数据流进入(无状态)函数后,返回一个新的数据流。

而在 Go 中,目前标准库是没有提供相关的实现,既眼馋于 Stream API 的易用与炫酷,又恰逢 Go 泛型的即将发布(go1.18 发布),
想着将二者结合,体验一把 Go 泛型,于是就有了这篇文章。

我们的目标很明确,使用 Go 泛型实现一个流式处理库,实现后的效果如下:

ok := stream.Just(1, 2, 3, 4, 5).
    Concat(stream.Just(6, 9, 10)).
    Map(func (item int) int {
        return item + 100
    }).
    Filter(func (item int) bool {
        return item > 105
    }).
    Sort(func (a, b int) bool {
        return a > b
    }).
    AllMatch(func (item int) bool {
        return item > 105
    })

下面,就来实现它吧。

泛型

泛型是 Go 近几年最大的一次变动,考虑到很多同学对其不熟悉,这里会先介绍一下 Go 泛型,如果你对 Go 泛型非常了解,可以跳过。

泛型样例 1

在 Go 中,对于一个加法函数:

func add(a, b int) int {
	return a + b
}

由于 Go 是强类型语言,因此在编译器就必须确定 a,b 的类型,一旦确定 a,b 是int类型后,就无法改变其类型。

因此,函数 add 就只能用于 int 类型数据的加法,如果此时还需要一个用于浮点数的加法函数,
那么就只能新写一个函数了:

// Bad
func add(a, b float64) float64 {
	return a + b
}

而 Go 本身是不支持函数重载的,因此两个 add 函数重名,是不能编译通过的,所以我们需要更改函数名:

func addi(a, b int) int {
	return a + b
}

func addf(a, b float64) float64 {
	return a + b
}

如果再新增一个用于 complex64 加法函数,就需要再写一个相同功能的函数,而这些函数除了数据类型不同,其它的逻辑却一样。

在 Go1.18 之前,对于这些重复的逻辑,我们可以通过 interface{} 和反射来将其提取为一个函数,如下:

// Bad, need to be improved
func add(a, b interface{}) interface{} {
	typ := reflect.TypeOf(a)
	if typ.Kind() == reflect.Int {
		return a.(int) + b.(int)
	} else {
		// TODO
	}
	return nil
}

虽然这样可以传入任意类型的参数,但是却给程序的安全性和可维护性带来了问题,并且每次都需要类型断言,给函数使用带来的麻烦。

而在 Go1.18 引入了泛型后,我们可以实现泛型来解决这个问题:

func add[T int | float64](a, b T) T {
	return a + b
}

函数 add 的参数 a,b 类型为T,而T并不是一个固定的类型,它既可以为int类型, 也可以为float64类型,因此将其称为泛型。

有了泛型T后,函数 add 只需定义一次,就可以接受两种类型的参数,并且完成加法,
而如果需要再支持一个新类型,也只需给T添加一个新的类型约束即可:

func add[T int | float64 | complex64](a, b T) T {
	return a + b
}

考虑到,如果T的类型约束过多,那么写在函数上面也不太好看,因此 Go 支持我们将泛型定义为接口再使用,如下:

// main.go
type Addable interface {
	int | int64 | int32 | float64 | float32 | string
}

func add[T Addable](a, b T) T {
	return a + b
}

这样,泛型T就能支持 6 种类型的加法,而无需为每种类型都写一个 add 函数。

泛型原理

那么泛型T是如何实现支持多种数据类型的呢?

使用反汇编工具来查看一下生成的汇编代码:

$ go tool compile -N -l -S ./main.go
$ go tool objdump ./main.o

对应的汇编代码如下(截取部分):

TEXT "".add[go.shape.int_0](SB) gofile../add/main.go
  main.go:11            SUBQ $0x10, SP
  main.go:11            MOVQ BP, 0x8(SP)
  main.go:11            LEAQ 0x8(SP), BP
  main.go:11            MOVQ AX, 0x18(SP)
  main.go:11            MOVQ BX, 0x20(SP)
  main.go:11            MOVQ CX, 0x28(SP)
  main.go:11            MOVQ $0x0, 0(SP)
  main.go:12            MOVQ 0x20(SP), AX
  main.go:12            ADDQ 0x28(SP), AX
  main.go:12            MOVQ AX, 0(SP)
  main.go:12            MOVQ 0x8(SP), BP
  main.go:12            ADDQ $0x10, SP
  main.go:12            RET

TEXT "".add[go.shape.float64_0](SB) gofile../add/main.go
  main.go:11            SUBQ $0x10, SP
  main.go:11            MOVQ BP, 0x8(SP)
  main.go:11            LEAQ 0x8(SP), BP
  main.go:11            MOVQ AX, 0x18(SP)
  main.go:11            MOVSD_XMM X0, 0x20(SP)
  main.go:11            MOVSD_XMM X1, 0x28(SP)
  main.go:11            XORPS X1, X1
  main.go:11            MOVSD_XMM X1, 0(SP)
  main.go:12            MOVSD_XMM 0x20(SP), X0
  main.go:12            ADDSD 0x28(SP), X0
  main.go:12            MOVSD_XMM X0, 0(SP)
  main.go:12            MOVQ 0x8(SP), BP
  main.go:12            ADDQ $0x10, SP
  main.go:12            RET

从汇编代码上可以看到,Go add 函数被编译成了多个 add[go.shape.xxx] 函数,不同类型
对应的着不同的指令实现,如 int_0int 的函数实现,通过 ADDQ 指令来实现加法,
float64_0 通过 ADDSD 指令来实现加法。

Go 泛型可以理解为:编译器帮忙写不同类型的实现代码,编译器通过泛型定义来生成不同的指令实现。

泛型样例 2

下面,来看一个稍微复杂一点的泛型样例。在正式的流式处理实现之前,我们先来看看 Map、Filter 函数的泛型是如何实现的?

首先,给出类型定义:

type (
	Item interface {
		comparable
	}
	// FilterFunc 筛选函数
	FilterFunc[T Item] func(T) bool
	// MapFunc 数据映射函数
	MapFunc[T Item, R Item] func(item T) R
)

此处定义了三个类型,Item接口,作为泛型约束,内部实际是一个 comparable 类型。

comparable 是 Go1.18 后新加入至标准库的内置类型:

comparable is an interface that is implemented by all comparable types, (booleans, numbers, strings, pointers, channels, interfaces, arrays of comparable types, structs whose fields are all comparable types).

FilterFunc函数类型用于过滤数据,该函数有一个泛型参数,即 T,泛型约束则是 Item,代表着 FilterFunc 可以接受一切实现了 comparable 的参数。

MapFunc函数类型用于数据映射,该函数拥有两个泛型参数,即TR,泛型约束均为Item,所以 MapFunc 可以接受一个类型的参数,
但返回另一个类型的结果,但这两个类型均需实现 comparable 接口。

有了 FilterFunc 和 MapFunc 函数类型后,我们还需实现对应的函数来使用它们,如下:

func Map[T Item, R Item](a []T, m MapFunc[T, R]) []R {
	var n []R
	for _, e := range a {
		v := m(e)
		n = append(n, v)
	}
	return n
}

func Filter[T Item](a []T, f FilterFunc[T]) []T {
	var n []T
	for _, e := range a {
		if f(e) {
			n = append(n, e)
		}
	}
	return n
}

有了泛型后,MapFilter函数的实现可以说是既简洁又明确。对于 Map 函数,接受两个参数,分别是数据切片a和 MapFunc m
函数内部使用 range 得到每个元素后调用 m 函数得到新的元素,最后将其 append 到新的切片返回即可。

Filter 函数实现类似,就不做过多介绍了。

有了 Map 和 Filter 两个函数就,我们可以实现简单的数据过滤和映射。如下:

  1. vi是一个简单的整数切片;
  2. vi通过 Filter 函数会过滤掉>=4 的数字;
  3. 然后vi会通过 Map 函数得到最后的 float 切片;
  4. 输出最后的结果。
// main.go
func main() {
	vi := []int{1, 2, 3, 4, 5, 6}

	vi = Filter(vi, func(v int) bool {
		return v < 4
	})

	bi := Map(vi, func(v int) float32 {
		return float32(v) + 100.01
	})
	fmt.Println(bi)
}
// output: [101.01 102.01 103.01]

在这个例子中,可以清晰的看到泛型的作用。因为泛型的存在,使 Map 和 Filter 函数足够简单,每当需要新的类型支持时,
不用去兼容老的代码逻辑,而是直接在调用时指定泛型参数的类型即可。

Stream 设计

在对 Go 泛型和函数式编程有了一定了解后,我们再来看看如何设计和实现 Stream。流式处理与工厂中的流水线作业非常类似,可以
将其拆解为如下三个阶段:

  1. 创建阶段:获取原始数据;
  2. 加工阶段:数据处理、映射;
  3. 汇总阶段:得到数据处理后的结果。

创建阶段

按照这三个阶段来设计 Stream 的 API。首先是创建阶段,即 Stream 的构造函数:

// Stream computing
Stream[T Item] struct {
    source <-chan T // 只读 channel,不能写
}

func Range[T Item](source <-chan T) Stream[T]

func Just[T Item](items ...T) Stream[T]

func From[T Item](generate GenerateFunc[T]) Stream[T]

func (s Stream[T]) Concat(steams ...Stream[T]) Stream[T]

为了支持链式调用,将 Stream 定义为一个结构体,Stream 有一个泛型参数,用于编译时指定
数据流的类型;Stream 内部通过 source 字段来存储数据流。

提供了 Range、Just、From、Concat 四个函数来初始化、合并数据流:

  • Range:通过 channel 来初始化 Stream;
  • Just:通过切片来初始化;
  • From:通过生成函数来初始化;
  • Concat:通过 Stream 拼接得到新的 Stream。

加工阶段

数据处理映射阶段可以定义出非常复杂和丰富的 API,这里只给出部分的定义:

func (s Stream[T]) Distinct(keyFunc KeyFunc[T, T]) Stream[T]

func (s Stream[T]) Filter(filterFunc FilterFunc[T], opts ...Option) Stream[T]

func (s Stream[T]) Walk(fn WalkFunc[T, T], opts ...Option) Stream[T]

func (s Stream[T]) Head(n int64) Stream[T]

func (s Stream[T]) Tail(n int64) Stream[T]

func (s Stream[T]) Map(fn MapFunc[T, T], opts ...Option) Stream[T]

func (s Stream[T]) Sort(fn LessFunc[T]) Stream[T]

数据处理不会停止流,而是彼此组合,越丰富的 API 越拥有强大的数据处理能力。

其对应的功能分别如下:

  • Distinct:数据去重;
  • Filter:数据过滤;
  • Walk:数据遍历;
  • Head:截取流头部数据;
  • Tail:截取流尾部数据:
  • Map:数据映射;
  • Sort:数据排序。

汇总阶段

数据流经处理后,可由汇总函数得到最后的处理结果,汇总函数调用后,流处理就会终止。

其部分 API 定义如下:

func (s Stream[T]) AllMatch(fn PredicateFunc[T]) bool

func (s Stream[T]) AnyMatch(fn PredicateFunc[T]) bool

func (s Stream[T]) NoneMatch(fn func(item T) bool) bool

func (s Stream[T]) Count() int

func (s Stream[T]) ForAll(fn ForAllFunc[T])

func (s Stream[T]) ForEach(fn ForEachFunc[T])

汇总函数不会再返回新的数据流,而是一个处理结果:

  • AllMatch:每个数据项均需满足条件后返回结果;
  • AnyMatch:任何一个数据项需满足条件后返回结果;
  • NoneMatch:无任何数据项需满足条件后返回结果;
  • Count:得到数据项总数;
  • ForAll:对整个数据流执行操作;
  • ForEach:对数据流中的每一项都执行操作。

Stream 实现

在设计完备的基础上,借助 Go channel 和泛型的强大能力,我们能够非常方便的实现 Stream。

这里只给出部分 API 的实现,其它基本类似。

创建阶段

Range 函数为例:

func Range[T Item](source <-chan T) Stream[T] {
	return Stream[T]{
		source: source,
	}
}

Range 函数实则是一个简单的构造函数,从参数读取 source 后赋值给 Stream 返回即可。

对于稍微复杂一点的 Concat 函数:

func (s Stream[T]) Concat(steams ...Stream[T]) Stream[T] {
	// 创建新的无缓冲channel
	source := make(chan T)
	go func() {
		// 创建一个waiGroup对象
		group := task.NewRoutineGroup()
		// 异步从原channel读取数据
		group.Run(func() {
			for item := range s.source {
				source <- item
			}
		})
		// 异步读取待拼接Stream的channel数据
		for _, stream := range steams {
			// 每个Stream开启一个协程
			group.Run(func() {
				for item := range stream.channel() {
					source <- item
				}
			})
		}
		// 阻塞等待读取完成
		group.Wait()
		close(source)
	}()
	// 返回新的Stream
	return Range[T](source)
}

Concat 函数会拼接多个流的数据,然后汇总到 source 通道中,最后返回。

加工阶段

数据处理函数是实现的一个难点,以 Distinct 为例:

// Distinct 去重,使用 map 来实现去重
func (s Stream[T]) Distinct(keyFunc KeyFunc[T, T]) Stream[T] {
	source := make(chan T)
	common.GoSafe(func() { // 新建协程写数据
		// channel记得关闭是个好习惯
		defer close(source)
		keys := make(map[T]common.PlaceholderType)

		for item := range s.source {
			// 自定义去重逻辑
			key := keyFunc(item) // 这里的key类型是R
			// 如果key不存在,则将数据写入新的channel
			if _, ok := keys[key]; !ok {
				source <- item
				keys[key] = common.Placeholder
			}
		}
	})
	return Range[T](source)
}

Distinct 函数的核心在于去重,这里我们借助 map 来混存数据映射关系,以数据项作为 key,
去掉重复的 key 从而达到去重的效果。

对于 Tail 函数,由于 channel 的特性,我们必须遍历完 channel 才能得到最后的几项,因此
我们可以借助环形切片来实现:

func (s Stream[T]) Tail(n int64) Stream[T] {
	if n < 1 {
		panic("n must be greater than 1")
	}
	source := make(chan T)
	go func() {
		ring := collection.NewRing[T](int(n))
		// 读取全部元素,如果数量>n环形切片能实现新数据覆盖旧数据
		// 保证获取到的一定最后n个元素
		for item := range s.source {
			ring.Add(item)
		}
		for _, item := range ring.Take() {
			source <- item
		}
		close(source)
	}()
	return Range[T](source)
}

Ring是一个带有固定缓存池的环形切片,它只能保存固定n个数据项,我们依次遍历 source
并向 Ring 中添加,待遍历完毕后,就能得到最后的 n 个数据。

Head 函数就比较简单了,只需遍历 n 次停止即可,这里就不做过多介绍了。

汇总阶段

在数据汇总阶段,实则是对数据流的一个检查与判断,以AnyMatch为例:

func (s Stream[T]) AnyMatch(fn PredicateFunc[T]) bool {
	for item := range s.source {
		if fn(item) {
			// 需要排空 s.source,否则前面的goroutine可能阻塞
			go drain(s.source)
			return true
		}
	}

	return false
}

AnyMatch接受一个断言函数fn,在遍历 source 的时候,只需一个满足条件的项出现,就能立马返回 true。

AllMatchNoneMatch与之类似,内部处理逻辑稍有不同。

而对于ForEachCount就必须全部遍历,并对每一个数据项进行处理:

func (s Stream[T]) ForEach(fn ForEachFunc[T]) {
	for item := range s.source {
		fn(item)
	}
}

func (s Stream[T]) Count() int {
	var count int
	for range s.source {
		count++
	}
	return count
}

ForEach 接受一个 fn 参数,用于对每一个数据项处理,而 Count 函数只需增加数据项的总数返回即可。

总结

完成 Stream 后,就能方便的使用 Stream API 对数据进行处理与检查,如下:

// main.go

func main() {
	ok := stream.Just(1, 2, 3, 4, 5).
		Concat(stream.Just(6, 9, 10)).
		Map(func(item int) int {
			return item + 100
		}).
		Filter(func(item int) bool {
			return item > 105
		}).
		Sort(func(a, b int) bool {
			return a > b
		}).
		AllMatch(func(item int) bool {
			return item > 105
		})
	fmt.Println(ok)
}

// output: true

Stream 处理数据优雅又易读,在有了泛型后,就无需过多关系数据类型的问题,相较于使用 for range 的
语法来处理数据流,Stream 无疑更加合适。

而在整个设计、实现的过程中,借助 channel 和泛型是颇为简单的,当然前提是你得对数据结构有一定的理解,
上面我们就借助哈希表和环形切片来快速实现了数据去重、数据尾项等功能。

至此,一个基于泛型的流式处理库已实现完毕,当然这个库只能称为 tiny Stream,因为 Stream API 实则非常丰富,
比如我们未涉及到的 Reduce 等,这就待我们后续去实践和完善了。

参考资料

  • https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
  • https://mp.weixin.qq.com/s/ocI6TWGI9f5c11uAKBumew
  • https://mp.weixin.qq.com/s/t3INtSfFSmv-nsJqLmdPew
  • https://github.com/zeromicro/go-zero
  • https://github.com/akutz/go-generics-the-hard-way
  • https://github.com/mattn/go-generics-example
  • https://github.com/zyedidia/generic
  • https://github.com/golang/go