AlphaGo Zero你也来造一只,PyTorch实现五脏俱全| 附代码

1月前 361

遥想当年,AlphaGo的Master版本,在完胜柯洁九段之后不久,就被后辈AlphaGo Zero (简称狗零) 击溃了。

从一只完全不懂围棋的AI,到打败Master,狗零只用了21天。



而且,它不需要用人类知识来喂养,成为顶尖棋手全靠自学。

如果能培育这样一只AI,即便自己不会下棋,也可以很骄傲吧。

于是,来自巴黎的少年Dylan Djian (简称小笛) ,就照着狗零的论文去实现了一下。

image

他给自己的AI棋手起名SuperGo,也提供了代码 (传送门见文底) 。

除此之外,还有教程——

一个身子两个头

智能体分成三个部分:

一是特征提取器 (Feature Extractor) ,二是策略网络 (Policy Network) ,三是价值网络 (Value Network) 。

于是,狗零也被亲切地称为“双头怪”。特征提取器是身子,其他两个网络是脑子。

特征提取器

特征提取模型,是个残差网络 (ResNet) ,就是给普通CNN加上了跳层连接 (Skip Connection) , 让梯度的传播更加通畅。



image


跳跃的样子,写成代码就是:
1class BasicBlock(nn.Module):
 2    """
 3    Basic residual block with 2 convolutions and a skip connection
 4    before the last ReLU activation.
 5    """ 
 6
 7    def __init__(self, inplanes, planes, stride=1, downsample=None):
 8        super(BasicBlock, self).__init__()
 9
10        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=3,
11                        stride=stride, padding=1, bias=False)
12        self.bn1 = nn.BatchNorm2d(planes)
13
14        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
15                        stride=stride, padding=1, bias=False)
16        self.bn2 = nn.BatchNorm2d(planes)
17
18
19    def forward(self, x):
20        residual = x
21
22        out = self.conv1(x)
23        out = F.relu(self.bn1(out))
24
25        out = self.conv2(out)
26        out = self.bn2(out)
27
28        out += residual
29        out = F.relu(out)
30
31        return out

然后,把它加到特征提取模型里面去:

1class Extractor(nn.Module):
2    def __init__(self, inplanes, outplanes):
3        super(Extractor, self).__init__()
4        self.conv1 = nn.Conv2d(inplanes, outplanes, stride=1,
5                        kernel_size=3, padding=1, bias=False)
6        self.bn1 = nn.BatchNorm2d(outplanes)
7
8        for block in range(BLOCKS):
9            setattr(self, "res{}".format(block), \
10                BasicBlock(outplanes, outplanes))
11
12
13    def forward(self, x):
14        x = F.relu(self.bn1(self.conv1(x)))
15        for block in range(BLOCKS - 1):
16            x = getattr(self, "res{}".format(block))(x)
17
18        feature_maps = getattr(self, "res{}".format(BLOCKS - 1))(x)
19        return feature_maps

策略网络

策略网络就是普通的CNN了,里面有个批量标准化 (Batch Normalization) ,还有一个全连接层,输出概率分布。

<p style="text-align:center">![image](https://yqfile.alicdn.com/70c7e67978ad20f5587ad71e5b68dad2d7fddb9f.png)</p>
1class PolicyNet(nn.Module):
 2    def __init__(self, inplanes, outplanes):
 3        super(PolicyNet, self).__init__()
 4        self.outplanes = outplanes
 5        self.conv = nn.Conv2d(inplanes, 1, kernel_size=1)
 6        self.bn = nn.BatchNorm2d(1)
 7        self.logsoftmax = nn.LogSoftmax(dim=1)
 8        self.fc = nn.Linear(outplanes - 1, outplanes)
 9
10
11    def forward(self, x):
12        x = F.relu(self.bn(self.conv(x)))
13        x = x.view(-1, self.outplanes - 1)
14        x = self.fc(x)
15        probas = self.logsoftmax(x).exp()
16
17        return probas

价值网络

这个网络稍微复杂一点。除了标配之外,还要再多加一个全连接层。最后,用双曲正切 (Hyperbolic Tangent) 算出 (-1,1) 之间的数值,来表示当前状态下的赢面多大。

image

代码长这样——

1class ValueNet(nn.Module):
 2    def __init__(self, inplanes, outplanes):
 3        super(ValueNet, self).__init__()
 4        self.outplanes = outplanes
 5        self.conv = nn.Conv2d(inplanes, 1, kernel_size=1)
 6        self.bn = nn.BatchNorm2d(1)
 7        self.fc1 = nn.Linear(outplanes - 1, 256)
 8        self.fc2 = nn.Linear(256, 1)
 9
10
11    def forward(self, x):
12        x = F.relu(self.bn(self.conv(x)))
13        x = x.view(-1, self.outplanes - 1)
14        x = F.relu(self.fc1(x))
15        winning = F.tanh(self.fc2(x))
16        return winning

未雨绸缪的树

狗零,还有一个很重要的组成部分,就是蒙特卡洛树搜索 (MCTS) 。

它可以让AI棋手提前找出,胜率最高的落子点。

在模拟器里,模拟对方的下一手,以及再下一手,给出应对之策,所以提前的远不止是一步。

节点 (Node)

树上的每一个节点,都代表一种不同的局势,有不同的统计数据:

每个节点被经过的次数n,总动作值w,经过这一点的先验概率p,平均动作值q (q=w/n) ,还有从别处来到这个节点走的那一步,以及从这个节点出发、所有可能的下一步。

1class Node:
2    def __init__(self, parent=None, proba=None, move=None):
3        self.p = proba
4        self.n = 0
5        self.w = 0
6        self.q = 0
7        self.children = []
8        self.parent = parent
9        self.move = move

部署 (Rollout)

第一步是PUCT (多项式上置信树) 算法,选择能让PUCT函数 (下图) 的某个变体 (Variant) 最大化,的走法。

image


写成代码的话——

1def select(nodes, c_puct=C_PUCT):
2    " Optimized version of the selection based of the PUCT formula "
3
4    total_count = 0
5    for i in range(nodes.shape[0]):
6        total_count += nodes[i][1]
7
8    action_scores = np.zeros(nodes.shape[0])
9    for i in range(nodes.shape[0]):
10        action_scores[i] = nodes[i][0] + c_puct * nodes[i][2] * \
11                (np.sqrt(total_count) / (1 + nodes[i][1]))
12
13    equals = np.where(action_scores == np.max(action_scores))[0]
14    if equals.shape[0] > 0:
15        return np.random.choice(equals)
16    return equals[0]

结束 (Ending)

选择在不停地进行,直至到达一个叶节点 (Leaf Node) ,而这个节点还没有往下生枝。

1def is_leaf(self):
2    """ Check whether a node is a leaf or not """
3
4    return len(self.children) == 0

到了叶节点,那里的一个随机状态就会被评估,得出所有“下一步”的概率。

所有被禁的落子点,概率会变成零,然后重新把总概率归为1。

然后,这个叶节点就会生出枝节 (都是可以落子的位置,概率不为零的那些) 。代码如下——

1def expand(self, probas):
2    self.children = [Node(parent=self, move=idx, proba=probas[idx]) \
3                for idx in range(probas.shape[0]) if probas[idx] > 0]

更新一下

枝节生好之后,这个叶节点和它的妈妈们,身上的统计数据都会更新,用的是下面这两串代码。

1def update(self, v):
2    """ Update the node statistics after a rollout """
3
4    self.w = self.w + v
5    self.q = self.w / self.n if self.n > 0 else 0

1while current_node.parent:
2    current_node.update(v)
3    current_node = current_node.parent

选择落子点

模拟器搭好了,每个可能的“下一步”,都有了自己的统计数据。

按照这些数据,算法会选择其中一步,真要落子的地方。

选择有两种,一就是选择被模拟的次数最多的点。试用于测试和实战。

另外一种,随机 (Stochastically) 选择,把节点被经过的次数转换成概率分布,用的是以下代码——

1total = np.sum(action_scores)
2probas = action_scores / total
3move = np.random.choice(action_scores.shape[0], p=probas)

后者适用于训练,让AlphaGo探索更多可能的选择。

三位一体的修炼

狗零的修炼分为三个过程,是异步的。

一是自对弈 (Self-Play) ,用来生成数据。

1def self_play():
 2    while True:
 3        new_player, checkpoint = load_player()
 4        if new_player:
 5            player = new_player
 6
 7        ## Create the self-play match queue of processes
 8        results = create_matches(player, cores=PARALLEL_SELF_PLAY,
 9                                         match_number=SELF_PLAY_MATCH) 
10        for _ in range(SELF_PLAY_MATCH):
11            result = results.get()
12            db.insert({
13                "game": result,
14                "id": game_id
15            })
16            game_id += 1

二是训练 (Training) ,拿新鲜生成的数据,来改进当前的神经网络。

1def train():
 2    criterion = AlphaLoss()
 3    dataset = SelfPlayDataset()
 4    player, checkpoint = load_player(current_time, loaded_version) 
 5    optimizer = create_optimizer(player, lr,
 6                                    param=checkpoint['optimizer'])
 7    best_player = deepcopy(player)
 8    dataloader = DataLoader(dataset, collate_fn=collate_fn, \
 9                batch_size=BATCH_SIZE, shuffle=True)
10
11    while True:
12        for batch_idx, (state, move, winner) in enumerate(dataloader):
13
14            ## Evaluate a copy of the current network
15            if total_ite % TRAIN_STEPS == 0:
16                pending_player = deepcopy(player)
17                result = evaluate(pending_player, best_player)
18
19                if result:
20                    best_player = pending_player
21
22            example = {
23                'state': state,
24                'winner': winner,
25                'move' : move
26            }
27            optimizer.zero_grad()
28            winner, probas = pending_player.predict(example['state'])
29
30            loss = criterion(winner, example['winner'], \
31                            probas, example['move'])
32            loss.backward()
33            optimizer.step()
34
35            ## Fetch new games
36            if total_ite % REFRESH_TICK == 0:
37                last_id = fetch_new_games(collection, dataset, last_id)

训练用的损失函数表示如下:

1class AlphaLoss(torch.nn.Module):
 2    def __init__(self):
 3        super(AlphaLoss, self).__init__()
 4
 5    def forward(self, pred_winner, winner, pred_probas, probas):
 6        value_error = (winner - pred_winner) ** 2
 7        policy_error = torch.sum((-probas * 
 8                                (1e-6 + pred_probas).log()), 1)
 9        total_error = (value_error.view(-1) + policy_error).mean()
10        return total_error

三是评估 (Evaluation) ,看训练过的智能体,比起正在生成数据的智能体,是不是更优秀了 (最优秀者回到第一步,继续生成数据) 。

1def evaluate(player, new_player):
 2    results = play(player, opponent=new_player)
 3    black_wins = 0
 4    white_wins = 0
 5
 6    for result in results:
 7        if result[0] == 1:
 8            white_wins += 1
 9        elif result[0] == 0:
10            black_wins += 1
11
12    ## Check if the trained player (black) is better than
13    ## the current best player depending on the threshold
14    if black_wins >= EVAL_THRESH * len(results):
15        return True
16    return False

第三部分很重要,要不断选出最优的网络,来不断生成高质量的数据,才能提升AI的棋艺。

三个环节周而复始,才能养成强大的棋手。

年幼的SuperGo

小笛用学校的服务器训练了AI棋手一星期。

SuperGo还年幼,是在9x9棋盘上训练的。

小笛说,他的AI现在好像还不懂生死一类的事,但应该已经知道围棋是个抢地盘的游戏了。

虽然,没有训练出什么超神的棋手,但这次尝试依然值得庆祝。

Reddit上面也有同仁发来贺电。

image

△ 有前途的意思
有志于AI围棋的各位,也可以试一试这个PyTorch实现。

代码实现传送门:
https://github.com/dylandjian/SuperGo

教程原文传送门:
https://dylandjian.github.io/alphago-zero/

AlphaGo Zero论文传送门:
https://www.nature.com/articles/nature24270.epdf

js Image AlphaGo

作者

技术小能手
TA的文章

相关文章