何万回,何億回と繰り返されるループ内で実行されるプログラムは,プログラム全体の実行速度に大きな影響を与えます.ループ内プログラムの高速化は,プログラムが遅いと思った時の有効な手段です.
この記事では,ループを何億回と繰り返すプログラムを高速化した話を紹介します.
あるItem
(商品)をどのようなGroup
(倉庫/工場/広告媒体など)に振り分けるのが良いかを考えます.
ここでは,振り分けして得られた結果の分析を高速化することを考えます.
Go言語で説明をします.以下のデータ構造が与えられたとします.
type Item struct {
Id string `json:"id"`
AllocatedGroupId string `json:"allocatedGroupId"`
Attributes map[string]string `json:"attributes"`
}
type Group struct {
Id string `json:"id"`
Attributes map[string]string `json:"attributes"`
}
type Condition struct {
Id string `json:"id"`
CalcMethod string `json:"calcMethod"`
Target string `json:"target"`
Filters map[string]string `json:"filters"`
}
Item
, Group
の配列と Condition
があります.データ例は,以下の通りです.
{
"id": "item001",
"allocatedGroupId": "group022",
"attributes": {
"name": "三色ボールペン",
"category": "ペン",
"price": "80",
"weight": "34.4",
"release": "2021",
"seller": "A社",
"discount": "不可",
...
}
}
{
"id": "group022",
"attributes": {
"name": "東京第3倉庫2F",
"affiliation": "東京第3倉庫",
"address": "東京都XXX",
"capacity": "1800",
"useOfShips": "false",
"heightLimit": "230",
"weightLimit": "500",
...
}
}
{
"id": "condition001",
"calcMethod": "mean",
"target": "price",
"filters": {
"category": "ペン",
"affiliation": "東京第3倉庫"
}
}
入力データ構造は,他の機能との兼ね合いや汎用性を考慮したものになっています.
東京第3倉庫にあるペンItem
について,その平均価格を高速に求めるのがミッションです.
前処理なしのプログラムを実装します.
func calcTime01(items []Item, groups []Group, condition Condition) {
now := time.Now()
for i := 0; i < 1e5; i++ {
calc01(items, groups, condition)
}
fmt.Println(time.Since(now))
}
func calc01(items []Item, groups []Group, condition Condition) float64 {
numbers := make([]float64, 0, len(items))
Loop:
for _, item := range items {
// itemとgroupの紐づけ
f := func(x Group) bool { return item.AllocatedGroupId == x.Id }
group, _ := lo.Find(groups, f)
maps.Copy(item.Attributes, group.Attributes)
// condition.Filtersの適用
for key, value := range condition.Filters {
if item.Attributes[key] != value {
continue Loop
}
}
// condition.Targetの処理
value := item.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
numbers = append(numbers, number)
}
// switchの分岐
switch condition.CalcMethod {
case "sum":
return floats.Sum(numbers)
case "product":
return floats.Prod(numbers)
case "max":
return floats.Max(numbers)
case "min":
return floats.Min(numbers)
case "mean":
return stat.Mean(numbers, nil)
case "std":
return stat.StdDev(numbers, nil)
}
return 0
}
プログラムの実行時間は,8.7301438秒でした.問題点がたくさんあります.改良していきましょう.
switchの分岐をループの外側で実行します.合わせて,スカラー値の計算もcalc関数の外側に移動します.
func calcTime02(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐
var function func([]float64) float64
switch condition.CalcMethod {
case "sum":
function = func(x []float64) float64 { return floats.Sum(x) }
case "product":
function = func(x []float64) float64 { return floats.Prod(x) }
case "max":
function = func(x []float64) float64 { return floats.Max(x) }
case "min":
function = func(x []float64) float64 { return floats.Min(x) }
case "mean":
function = func(x []float64) float64 { return stat.Mean(x, nil) }
case "std":
function = func(x []float64) float64 { return stat.StdDev(x, nil) }
}
for i := 0; i < 1e5; i++ {
numbers := calc02(items, groups, condition)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc02(items []Item, groups []Group, condition Condition) []float64 {
numbers := make([]float64, 0, len(items))
Loop:
for _, item := range items {
f := func(x Group) bool { return item.AllocatedGroupId == x.Id }
group, _ := lo.Find(groups, f)
maps.Copy(item.Attributes, group.Attributes)
for key, value := range condition.Filters {
if item.Attributes[key] != value {
continue Loop
}
}
value := item.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
numbers = append(numbers, number)
}
return values
}
プログラムの実行時間は,8.6701029秒でした.この調子で.改良していきましょう.
group
の検索処理を変更します.
func calcTime03(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐 省略
// newGroupsの作成
f := func(x Group) (string, map[string]string) { return x.Id, x.Attributes }
newGroups := lo.Associate(groups, f)
for i := 0; i < 1e5; i++ {
numbers := calc03(items, newGroups, condition)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc03(items []Item, groups map[string]map[string]string, condition Condition) []float64 {
numbers := make([]float64, 0, len(items))
Loop:
for _, item := range items {
// itemとgroupの紐づけ
maps.Copy(item.Attributes, groups[item.AllocatedGroupId])
// condition.Filtersの適用
for key, value := range condition.Filters {
if item.Attributes[key] != value {
continue Loop
}
}
// condition.Targetの処理
value := item.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
numbers = append(numbers, number)
}
return numbers
}
プログラムの実行時間は,7.4015557秒でした.groups
には,まだ改良の余地がありそうです.
group.Attributes
の不要なデータを前処理で削減します.
func calcTime04(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐 省略
// newGroupsの作成
keys := append(lo.Keys(condition.Filters), condition.Target)
f := func(x Group) (string, map[string]string) { return x.Id, lo.PickByKeys(x.Attributes, keys) }
newGroups := lo.Associate(groups, f)
for i := 0; i < 1e5; i++ {
numbers := calc04(items, newGroups, condition)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc04(items []Item, groups map[string]map[string]string, condition Condition) []float64 {
numbers := make([]float64, 0, len(items))
Loop:
for _, item := range items {
// itemとgroupの紐づけ
maps.Copy(item.Attributes, groups[item.AllocatedGroupId])
// condition.Filtersの適用
for key, value := range condition.Filters {
if item.Attributes[key] != value {
continue Loop
}
}
// condition.Targetの処理
value := item.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
numbers = append(numbers, number)
}
return numbers
}
プログラムの実行時間は,0.9466403秒でした.groups
の改良で,24行目のコピー処理が高速になりました.実行時間は1秒を切りましたが,更なる高速化を目指します.
item
とgroup
の対応関係は変化するデータであり,前処理で対処できません.しかし,それ以外のデータはitems
も変更できるものとします.前処理でitems
にcondition.Filters
を適用します.
func calcTime05(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐 省略
// newItemsの作成
newItems := lo.Filter(items, func(x Item, _ int) bool {
for key, value := range condition.Filters {
value2, exist := x.Attributes[key]
if exist && value2 != value {
return false
}
}
return true
})
// newGroupsの作成
keys := append(lo.Keys(condition.Filters), condition.Target)
f := func(x Group) (string, map[string]string) { return x.Id, lo.PickByKeys(x.Attributes, keys) }
newGroups := lo.Associate(groups, f)
for i := 0; i < 1e5; i++ {
numbers := calc05(newItems, newGroups, condition)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc05(items []Item, groups map[string]map[string]string, condition Condition) []float64 {
numbers := make([]float64, 0, len(items))
Loop:
for _, item := range items {
// itemとgroupの紐づけ
maps.Copy(item.Attributes, groups[item.AllocatedGroupId])
// condition.Filtersの適用
for key, value := range condition.Filters {
if item.Attributes[key] != value {
continue Loop
}
}
// condition.Targetの処理
value := item.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
numbers = append(numbers, number)
}
return numbers
}
プログラムの実行時間は,0.4960819秒でした.前処理によるデータ削減が効いています.
前処理でgroups
に対してもcondition.Filters
を適用します.
func calcTime06(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐 省略
// newItemsの作成
newItems := lo.Filter(items, func(x Item, _ int) bool {
for key, value := range condition.Filters {
value2, exist := x.Attributes[key]
if exist && value2 != value {
return false
}
}
return true
})
// newGroupsの作成
newGroups := lo.Associate(groups, func(x Group) (string, map[string]string) {
for key, value := range condition.Filters {
value2, exist := x.Attributes[key]
if exist && value2 != value {
return x.Id, nil
}
}
return x.Id, lo.PickByKeys(x.Attributes, []string{condition.Target})
})
for i := 0; i < 1e5; i++ {
numbers := calc06(newItems, newGroups, condition.Target)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc06(items []Item, groups map[string]map[string]string, target string) []float64 {
numbers := make([]float64, 0, len(items))
for _, item := range items {
// itemとgroupの紐づけとフィルター
groupAttributes := groups[item.AllocatedGroupId]
if groupAttributes == nil {
continue
}
maps.Copy(item.Attributes, groupAttributes)
// condition.Targetの処理
value := item.Attributes[target]
number, _ := strconv.ParseFloat(value, 64)
numbers = append(numbers, number)
}
return numbers
}
プログラムの実行時間は,0.0749873秒でした.44行目のコピー実行回数の減少が高速化に効果的です.
ループ内でconditionを呼び出さないようにします.
func calcTime07(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐 省略
// newItemsの作成 省略
// itemsTargetの作成
itemsTarget := lo.Map(newItems, func(x Item, _ int) float64 {
value := x.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
return number
})
// groupsTargetの作成
groupsTarget := lo.Associate(groups, func(x Group) (string, *float64) {
for key, value := range condition.Filters {
value2, exist := x.Attributes[key]
if exist && value2 != value {
return x.Id, nil
}
}
value := x.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
return x.Id, &number
})
for i := 0; i < 1e5; i++ {
numbers := calc07(newItems, itemsTarget, groupsTarget)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc07(items []Item, itemsTarget []float64, groupsTarget map[string]*float64) []float64 {
numbers := make([]float64, 0, len(items))
for i, item := range items {
groupTarget := groupsTarget[item.AllocatedGroupId]
if groupTarget == nil {
continue
}
numbers = append(numbers, itemsTarget[i]+*groupTarget)
}
return numbers
}
プログラムの実行時間は,0.0551922秒でした.ループ内の処理が少なくなりました.condition.Target
がitem
とgroup
のどちらのAttributes
でも対応できる形式です.
42行目は,condition.Target
が指定するAttributes
の数値 + 0を計算しています.
item
のallocatedGroupId
を配列にできると,プログラムは更に高速化できます.
func calcTime08(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐 省略
// newItemsの作成 省略
// allocatedGroupIdsの作成
f := func(x Item, _ int) string { return x.AllocatedGroupId }
allocatedGroupIds := lo.Map(newItems, f)
// itemsTargetの作成 省略
// groupsTargetの作成 省略
for i := 0; i < 1e5; i++ {
numbers := calc08(allocatedGroupIds, itemsTarget, groupsTarget)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc08(groupIds []string, itemsTarget []float64, groupsTarget map[string]*float64) []float64 {
numbers := make([]float64, 0, len(groupIds))
for i := range groupIds {
groupTarget := groupsTarget[groupIds[i]]
if groupTarget == nil {
continue
}
numbers = append(numbers, itemsTarget[i]+*groupTarget)
}
return numbers
}
プログラムの実行時間は,0.0512611秒でした.改修もあと少しです.
今まではitem
とgroup
の紐づけにidを使っていましたが,idをindexに変えると高速化できます.
func calcTime09(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐 省略
// newItemsの作成 省略
// allocatedGroupIndexesの作成
allocatedGroupIndexes := lo.Map(newItems, func(item Item, _ int) int {
f := func(group Group) bool { return item.AllocatedGroupId == group.Id }
_, index, _ := lo.FindIndexOf(groups, f)
return index
})
// itemsTargetの作成 省略
// groupsTargetの作成
groupsTarget := lo.Map(groups, func(x Group, _ int) *float64 {
for key, value := range condition.Filters {
value2, exist := x.Attributes[key]
if exist && value2 != value {
return nil
}
}
value := x.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
return &number
})
for i := 0; i < 1e5; i++ {
numbers := calc09(allocatedGroupIndexes, itemsTarget, groupsTarget)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc09(groupIndexes []int, itemsTarget []float64, groupsTarget []*float64) []float64 {
numbers := make([]float64, 0, len(groupIndexes))
for i := range groupIndexes {
groupTarget := groupsTarget[groupIndexes[i]]
if groupTarget == nil {
continue
}
numbers = append(numbers, itemsTarget[i]+*groupTarget)
}
return numbers
}
プログラムの実行時間は,0.0123349秒でした.最後に,プログラムをキレイに整理します
func calcTime10(items []Item, groups []Group, condition Condition) {
now := time.Now()
// switchの分岐
var function func([]float64) float64
switch condition.CalcMethod {
case "sum":
function = func(x []float64) float64 { return floats.Sum(x) }
case "product":
function = func(x []float64) float64 { return floats.Prod(x) }
case "max":
function = func(x []float64) float64 { return floats.Max(x) }
case "min":
function = func(x []float64) float64 { return floats.Min(x) }
case "mean":
function = func(x []float64) float64 { return stat.Mean(x, nil) }
case "std":
function = func(x []float64) float64 { return stat.StdDev(x, nil) }
}
// newItemsの作成
newItems := lo.Filter(items, func(x Item, _ int) bool {
for key, value := range condition.Filters {
value2, exist := x.Attributes[key]
if exist && value2 != value {
return false
}
}
return true
})
// allocatedGroupIndexesの作成
allocatedGroupIndexes := lo.Map(newItems, func(item Item, _ int) int {
f := func(group Group) bool { return item.AllocatedGroupId == group.Id }
_, index, _ := lo.FindIndexOf(groups, f)
return index
})
// itemsTargetの作成
itemsTarget := lo.Map(newItems, func(x Item, _ int) float64 {
value := x.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
return number
})
// groupsTargetの作成
groupsTarget := lo.Map(groups, func(x Group, _ int) *float64 {
for key, value := range condition.Filters {
value2, exist := x.Attributes[key]
if exist && value2 != value {
return nil
}
}
value := x.Attributes[condition.Target]
number, _ := strconv.ParseFloat(value, 64)
return &number
})
for i := 0; i < 1e5; i++ {
numbers := calc10(allocatedGroupIndexes, itemsTarget, groupsTarget)
function(numbers)
}
fmt.Println(time.Since(now))
}
func calc10(groupIndexes []int, itemsTarget []float64, groupsTarget []*float64) []float64 {
numbers := make([]float64, 0, len(groupIndexes))
for i := range groupIndexes {
if groupTarget := groupsTarget[groupIndexes[i]]; groupTarget != nil {
numbers = append(numbers, itemsTarget[i]+*groupTarget)
}
}
return numbers
}
プログラムの実行時間は,0.0100372秒でした.プログラムを870倍高速化できました.
今回の問題設定では,Item
とGroup
の対応関係のみ変化する想定です.プログラムの高速化では,変化する部分のみを抜き出し,それに影響しない部分を予め計算しておくことが大事です.
変化する部分 (items[].AllocatedGroupId
, allocatedGroupIds
, groupIndexes
) は,組み合わせ最適化の分野では設計変数ベクトル,決定変数ベクトル,遺伝子,解などの言葉で表現します.
設計変数ベクトルは,プログラムを高速に動かすため,単純な数値の配列で管理することをお勧めします.
最適化アルゴリズムや最適化ライブラリは高速に動くので,高速化の余地は少ないです.
最適化プログラムが遅いと感じた人は,設計変数ベクトルの表現と評価関数を見直してみてください.
プログラムを高速化した話を紹介しました.
コードを数十行書き換えることで,数百倍高速に動くことが明らかになりました.
コードの読みやすさも考慮しつつ,高速なプログラムの実装を目指しましょう.