7.6 卷积网络分类过程可视化

7.6.1 导入库

网络结构由network.py提供network.py
第一个隐层为含有三个节点的全连接层
第二个隐层为含有两个1x2卷积的卷积层
第三个隐层为池化层(最大池化) image.png
[1]:
import network
import matplotlib.pyplot  as plt
from matplotlib.colors import ListedColormap
from matplotlib import cm,colors
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
np.random.seed(100)

7.6.2 定义各类激活函数

[2]:
activation = 'relu'
if activation=='relu':
    activation_fn = network.relu_activation
elif activation == 'sigmoid':
    activation_fn = network.sigmoid_activation
elif activation == 'tanh':
    activation_fn = network.tanh_activation
else:
    print(activation+' function not implemented')

7.6.3 导入数据开始训练

根据需要设定合适的参数进行训练,net.SGD()函数参数如下:

training_data #训练数据

epochs #训练次数

mini_batch_size #batch大小

eta #学习率

lmbda = 0.0 #正则化系数

evaluation_data=None #验证数据

monitor_evaluation_cost=False #监测验证损失

monitor_evaluation_accuracy=True #监测验证精度

monitor_training_cost=True #监测训练损失

monitor_training_accuracy=True #监测训练精度

early_stopping_n = 0 #早停阈值

verbose = 0 # 是否开启冗余输出,开启之后才能显示上述监测内容

save_loss=False # 保存损失

save_delta = False # 保存误差

save_grad = False # 保存梯度

save_weights = False # 保存训练过程中的权重

save_Pl_Pa = False #保存各层偏导

[3]:
X, y = datasets.make_circles(n_samples=2000, factor=0.3, noise=.1,random_state=123)
X, X_test, y, y_test = train_test_split(X, y, test_size=0.33)
c,r = np.mgrid[[slice(X.min()- .2,X.max() + .2,50j)]*2]
p = np.c_[c.flat,r.flat]
#归一化
ss = StandardScaler().fit(X)
X = ss.transform(X)
p_0 = ss.transform(p)
X_test = ss.transform(X_test)
p = list(np.expand_dims(p_0,2))
#调整数据数据
training_data = list([[np.expand_dims(feature,axis=1),label] for feature,label in zip(X,y)])
test_data =  list([[np.expand_dims(feature,axis=1),label] for feature,label in zip(X_test,y_test)])
#"""全连接与卷积输入如[2,3,[3,2],1],其中的[3,2]代表卷积核为1*2,个数为卷积核数量为3"""
net = network.Network([2,3,[2,2],1],activation_fn=activation_fn,cost=network.BinaryLogCost,layers_type=['FC','C','FC'])
#是否已经训练且保存中间参数
have_trained = False
if have_trained:
    #如果已经训练了就直接加载
    weights_log = np.load('log/weights_log.npy',allow_pickle=True)
    bias_log = np.load('log/bias_log.npy',allow_pickle=True)
    net.set_weights(weights_log[800],bias_log[800])
else:
    #否则重新训练
    net.default_weight_initializer()
    _ = net.SGD(training_data, 2000, len(training_data), 0.08, evaluation_data=test_data,verbose=1,
            save_Pl_Pa=True,
            save_weights=True,
            save_grad=True,
            save_delta=True,
            save_loss=True,
    monitor_evaluation_accuracy=True,
    monitor_training_accuracy=True,
    monitor_training_cost=True)


7.6.4 可视化实验数据及实验结果

[9]:
fig = plt.figure(figsize = (9,3))
#自定义cmap
top = cm.get_cmap('Oranges_r', 512)
bottom = cm.get_cmap('Blues', 512)
newcolors = np.vstack((top(np.linspace(0.55, 1, 512)),
                       bottom(np.linspace(0, 0.75, 512))))
cm_bright = ListedColormap(newcolors, name='OrangeBlue')
plt.subplot(121)
m1 = plt.scatter(*X.T,c = y,cmap = cm_bright,edgecolors='white',s = 20,linewidths = 0.5)
plt.title('train samples')
plt.axis('equal')
plt.subplot(122)
m2 = plt.scatter(*X_test.T,c = y_test,cmap = cm_bright,edgecolors='white',s = 20,linewidths = 0.5);
plt.title('test samples')
plt.axis('equal')
ax = fig.get_axes()
plt.colorbar(ax = ax);
plt.show();

prob = np.squeeze(net.predict_pro(p))
p1 = np.array([np.squeeze(pp) for pp in p])
fig, (ax1,ax2) = plt.subplots(1,2, figsize=(9, 3),subplot_kw = {'aspect':'equal'})
ax1.scatter(*p1.T,c = prob,cmap = cm_bright)
ax1.scatter(*X.T,c = y,cmap = cm_bright,edgecolors='white',s = 20,linewidths = 0.5)
ax1.set_title('train score:%.5f'%net.accuracy(training_data,convert=True))
mp = ax2.scatter(*p1.T,c = prob,cmap = cm_bright)
ax2.scatter(*X_test.T,c = y_test,cmap = cm_bright,edgecolors='white',s = 20,linewidths = 0.5)
ax2.set_title('test score:%.5f'%net.accuracy(test_data,convert=True));
plt.colorbar(mp,ax = [ax1,ax2]);
../../_images/2ndPart_Chapter7.ImageClassification_7.6_1DCNN_ClassificationProcess_11_0.png
../../_images/2ndPart_Chapter7.ImageClassification_7.6_1DCNN_ClassificationProcess_11_1.png

7.6.5 定义逐层可视化函数

[10]:
def scatter(p, c, X, wb=None, cmap='Set3', value=True, zero_start=True):
    global y
    cols = p.shape[-1]
    #     assert cols in (1,2,3)
    fig = plt.figure(figsize=(6, 4))
    c_u = np.unique(c)

    if cols > 3:
        ax = plt.gca()
        ax.axis('equal')
        if wb is not None:
            a1, a2 = p.min(0)[:2] - 0.2
            b1, b2 = p.max(0)[:2] + 0.2
            (u1, u2, *_), b_ = wb
            y1, y2 = (a1 * u1 + b_) / (-u2), (b1 * u1 + b_) / (-u2)
            ax.plot([a1, b1], [y1, y2], 'k--')
            ax.set_ylim(a2, b2)

        st = ax.scatter(*p[:, :2].T, c=c, cmap=cmap)
        ax.scatter(*X[:, :2].T, c=y, alpha=0.7, cmap=cm_bright, edgecolors='white', s=20, linewidths=0.5)
        if value:
            fig.colorbar(st,shrink=0.8)
        else:
            if zero_start:
                fig.colorbar(st, ticks=np.linspace(0, c_u.shape[0] - 1, c_u.shape[0]),shrink=0.8)
            else:
                fig.colorbar(st, ticks=c_u,shrink=0.8)

        ax.set_xlabel('X')
        ax.set_ylabel('Y')

        return ax


    elif cols == 3:
        ax3d = Axes3D(fig)
        if wb is not None:
            (u1, u2, u3), b_ = wb
            a1, a2, a3 = p.min(0)[:3]
            b1, b2, b3 = p.max(0)[:3]
            if u3!=0:
                a, b = np.mgrid[a1 - 1:b1:10j, a2 - 1:b2:10j]
                z_ = (a * u1 + b * u2 + b_) / (-u3)
            else:
                if np.abs(u1)>np.abs(u2):
                    b, z_ = np.mgrid[a2-1:b2:10j,a3-1:b3:10j]
                    a = (b*u2+b_)/(-u1)
                else:
                    a,z_ = np.mgrid[a1-1:b1:10j,a3-1:b3:10j]
                    b = (a*u1+b_)/(-u2)
            wf = ax3d.plot_wireframe(a, b, z_)
            wf.set_color('k')

        mp = ax3d.scatter(*(p.T), c=c, cmap=cmap)
        ax3d.scatter(*X.T, c=y, cmap=cm_bright, edgecolors='white', s=40, linewidths=0.5)

        if value:
            fig.colorbar(mp, shrink=0.8)
        else:
            if zero_start:
                fig.colorbar(mp, shrink=0.8, ticks=np.linspace(0, c_u.shape[0] - 1, c_u.shape[0]))
            else:
                fig.colorbar(mp, ticks=c_u,shrink=0.8)

        ax3d.set_xlabel('X')
        ax3d.set_ylabel('Y')
        ax3d.set_zlabel('Z')
        return ax3d

    elif cols == 2:
        ax = plt.gca()
        ax.axis('equal')
        if wb is not None:
            a1, a2 = p.min(0) - 0.2
            b1, b2 = p.max(0) + 0.2
            (u1, u2), b_ = wb
            y1, y2 = (a1 * u1 + b_) / (-u2), (b1 * u1 + b_) / (-u2)
            ax.plot([a1, b1], [y1, y2], 'r--')
            ax.set_ylim(a2, b2)

        st = ax.scatter(*p.T, c=c, cmap=cmap)
        ax.scatter(*X.T, c=y, alpha=0.7, cmap=cm_bright, edgecolors='white', s=20, linewidths=0.5)
        if value:
            fig.colorbar(st,shrink=0.8)
        else:
            if zero_start:
                fig.colorbar(st, ticks=np.linspace(0, c_u.shape[0] - 1, c_u.shape[0]),shrink=0.8)
            else:
                fig.colorbar(st, ticks=c_u,shrink=0.8)

        ax.set_xlabel('X')
        ax.set_ylabel('Y')


    else:
        ax = plt.gca()
        t, tt = np.zeros_like(p.flat), np.zeros_like(X.flat)
        st = plt.scatter(p.flat, t, c=c, cmap=cmap)
        ax.scatter(X.flat, tt, c=y, alpha=0.7, cmap=cm_bright, edgecolors='white', s=20, linewidths=0.5)
        if value:
            fig.colorbar(st,shrink=0.8)
        else:
            if zero_start:
                fig.colorbar(st, ticks=np.linspace(0, c_u.shape[0] - 1, c_u.shape[0]),shrink=0.8)
            else:
                fig.colorbar(st, ticks=c_u,shrink=0.8)
    return ax

def add_hyperplane(ax,p,wb,color='gray'):
    #添加超平面
    cols = p.shape[-1]
    assert cols in [2,3]
    if cols==3:
        a1, a2 = p.min(0)[:2]
        b1, b2 = p.max(0)[:2]
        a, b = np.mgrid[a1 - 1:b1:10j, a2 - 1:b2:10j]
        (u1, u2, u3), b_ = wb
        z_ = (a * u1 + b * u2 + b_) / (-u3)
        ax.plot_wireframe(a,b,z_,color=color)
    else:
        a1, a2 = p.min(0) - 0.2
        b1, b2 = p.max(0) + 0.2
        (u1, u2), b_ = wb
        y1, y2 = (a1 * u1 + b_) / (-u2), (b1 * u1 + b_) / (-u2)
        ax.plot([a1, b1], [y1, y2], color=color)
    return ax

def mapping(code):
    # 转化为01编码的类型
    numMap = np.zeros(code.shape[0])
    uniq = np.unique(code, axis=0)
    for i, arr in enumerate(uniq):
        m = (np.sum(code == arr, axis=1) == code.shape[-1])
        numMap[m] = i
    return numMap

color_list = ['aquamarine', 'b', 'blueviolet','c', 'chartreuse',
 'darkcyan', 'darkgreen', 'darkkhaki', 'deeppink', 'deepskyblue',
 'gold', 'indigo', 'lightcoral', 'maroon', 'navy', 'olivedrab', 'peru',
 'pink', 'rosybrown', 'saddlebrown', 'slategray', 'steelblue', 'teal',
 'thistle', 'violet', 'y', 'yellow']
cmap = colors.ListedColormap(color_list)

7.6.6 卷积网络分类过程逐层可视化

[11]:
%matplotlib notebook
W,B = [w.T for w in net.weights],[b.squeeze(1) for b in net.biases]
actf = net.activation_fn.fn
X = X[:200]
y = y[:200]
# plt.close('all')
inV, inX = p_0, X
layersBinCode = None
for layer,(w, b) in enumerate(zip(W, B)):
    l_num =layer+1
    if net.layers_type[layer]=='FC':
        if l_num < len(W):
            activation = 'relu'
            actf = net.activation_fn.fn
            actf_P = net.activation_fn.prime
        else:
            activation = 'sigmoid'
            actf = network.sigmoid_activation.fn
            actf_P = network.sigmoid_activation.prime

        transV = inV @ w + b
        transX = inX @ w + b
        actV = actf(transV)
        actX = actf(transX)

        # 第k层各个节点的划分(第k层的二进制编码)
        if activation == 'relu':
            layerBinCode = np.where(actV > 0, 1, 0)
        elif activation == 'sigmoid':
            #         layerBinCode = actV
            layerBinCode = np.where(actV > 0.5, 1, 0)
        elif activation == 'tanh':
            layerBinCode = np.where(actV > 0, 1, 0)
    else:
        conv_size = net.sizes[layer+1]
        activation='relu'
        actf = net.activation_fn.fn
        transV_ = np.zeros((inV.shape[0],conv_size[0],conv_size[1]))
        transX_ = np.zeros((inX.shape[0],conv_size[0],conv_size[1]))
        for c_num in range(inV.shape[1]-conv_size[1]+1):
            transV_[:,:,c_num]=inV[:,c_num:c_num+conv_size[1]].reshape(-1,conv_size[1])@w+b
            transX_[:,:,c_num]=inX[:,c_num:c_num+conv_size[1]].reshape(-1,conv_size[1])@w+b
        actV_ = actf(transV_)
        actX_ = actf(transX_)
        actX_max_pooling = np.max(actX_,axis=2)
        actV_max_pooling = np.max(actV_,axis=2)
        actX_max_ind = np.argmax(transX_,axis=2)
        actV_max_ind = np.argmax(transV_,axis=2)
#         actX_max_ind = np.argmax(actX_,axis=2)
#         actV_max_ind = np.argmax(actV_,axis=2)
        # 第k层各个节点的划分(第k层的二进制编码)
        if activation == 'relu':
            layerBinCode = np.where(actV_max_pooling > 0, 1, 0)
            layerBinCode_noP = np.where(actV_>0, 1, 0)
        elif activation == 'sigmoid':
            layerBinCode = np.where(actV_max_pooling > 0.5, 1, 0)
            layerBinCode_noP = np.where(actV_>0.5,1,0)
        elif activation == 'tanh':
            layerBinCode = np.where(actV_max_pooling > 0, 1, 0)
        actX = actX_max_pooling
        actV = actV_max_pooling
#         print(np.min(actV))
        transX = np.squeeze([t_x[list(range(transX_.shape[1])),a_i] for t_x,a_i in zip(transX_,actX_max_ind)])
        transV = np.squeeze([t_v[list(range(transV_.shape[1])),a_i] for t_v,a_i in zip(transV_,actV_max_ind)])


    layerNumCode = mapping(layerBinCode)
    # 第k层的激活神经元的数量
    layerNumCode2 = np.sum(layerBinCode, 1)

    # 前k层的二进制编码
    layersBinCode = layerBinCode if layersBinCode is None else np.hstack((layersBinCode, layerBinCode))
    # 前k层的数字编码
    layersNumCode = mapping(layersBinCode)
    # 前k层激活神经元的数量
    layersNumCode2 = np.sum(layersBinCode, 1)

    n = actV.shape[-1]

    l = np.vstack((w, b)).T.astype('<U5').tolist()
    sl = [';'.join(z) for z in l]
    projIn = '3d' if inV.shape[-1] == 3 else None
    projOut = '3d' if transV.shape[-1] == 3 else None
    for i in range(n):
        # 在输入空间形成超平面
        if net.layers_type[layer]=='FC':
            w_draw  = w[:, i]
            b_draw = b[i]
        else:
            w_draw = np.append(w[:,i],0)
            b_draw = b[i]
            transV_noP = transV_[:,i]
            actV_noP = actV_[:,i]
            tranX_noP = transX_[:,i]
            actX_noP = actX_[:,i]
#             ax = scatter(inV,transV_noP.T[0],inX,(np.append(w[:,i],0),b[i]),cmap='summer')
#             ax.set_title("卷积层的第0个线性变化及超平面")
#             ax = scatter(inV,transV_noP.T[1],inX,(np.append(0,w[:,i]),b[i]),cmap='summer')
#             ax.set_title("卷积层的第1个线性变化及超平面")
#             ax = scatter(inV[:,:2],transV_noP.T[0],inX[:,:2],(w[:,i],b[i]),cmap='summer')
#             ax.set_title("卷积层的第0个线性变化及超平面")
#             ax = scatter(inV[:,1:3],transV_noP.T[1],inX[:,1:3],(w[:,i],b[i]),cmap='summer')
#             ax.set_title("卷积层的第1个线性变化及超平面")
            ax = scatter(inV[:,:2],actV_noP.T[0],inX[:,:2],(w[:,i],b[i]),cmap='summer')
            ax.set_title("卷积层的第0个激活及超平面")
            ax = scatter(inV[:,1:3],actV_noP.T[1],inX[:,1:3],(w[:,i],b[i]),cmap='summer')
            ax.set_title("卷积层的第1个激活及超平面")
            ax = scatter(inV,layerBinCode_noP[:,i,:].T[0,:],inX,cmap='coolwarm',value=False)
            ax.set_title("卷积层的第0个特征图对输入空间的划分")
            ax = scatter(p_0,layerBinCode_noP[:,i,:].T[0,:],X, cmap='coolwarm', value=False)
            ax.set_title("卷积层的第0个特征图对原始空间的划分")
            ax = scatter(inV,layerBinCode_noP[:,i,:].T[1,:],inX,cmap='coolwarm',value=False)
            ax.set_title("卷积层的第1个特征图对输入空间的划分")
            ax = scatter(p_0,layerBinCode_noP[:,i,:].T[1,:],X,cmap='coolwarm',value=False)
            ax.set_title("卷积层的第1个特征图对原始空间的划分")

#             ax = scatter(inV,actV_max_ind.T[i],inX,cmap='rainbow',value=False)
#             ax.set_title('第%d个卷积层在输入空间最大池化'%(i))
            ax = scatter(p_0,actV_max_ind.T[i],X,cmap='Set1',value=False)
            ax.set_title('第%d个卷积层在原始空间最大池化'%(i))
        ax = scatter(inV, transV.T[i], inX, (w_draw, b_draw), cmap='summer')
        if net.layers_type[layer]=='C':
            add_hyperplane(ax,inV,(np.append(0,w[:,i]),b[i]))
        ax.set_title('第%d个节点形成超平面:' % (i) + str(sl[i]))
        # 后层每个节点对输入空间进行激活的值
        ax = scatter(inV, actV.T[i], inX, (w_draw, b_draw), cmap='summer')
        if net.layers_type[layer]=='C':
            add_hyperplane(ax,inV,(np.append(0,w[:,i]),b[i]))
        ax.set_title('第%d个节点激活' % (i))
        # 后层每个节点对输入空间的划分
        ax = scatter(inV, layerBinCode.T[i], inX, (w_draw, b_draw), cmap='coolwarm', value=False,zero_start=False)
        if net.layers_type[layer]=='C':
            add_hyperplane(ax,inV,(np.append(0,w[:,i]),b[i]))
        ax.set_title('第%d个节点对输入空间划分' % (i))
        # 在原始特征空间进行划分
        ax = scatter(p_0, layerBinCode.T[i], X, cmap='coolwarm', value=False,zero_start=False)
        ax.set_title('第%d个节点对原始特征空间划分' % (i))



    ax = scatter(inV, layerNumCode, inX, value=False,cmap='coolwarm')
    ax.set_title('第%d层对输入空间的总体划分' % (l_num))
    # 输入空间进行仿射变换后在后层空间中的情况
    ax = scatter(transV, layerNumCode2, transX, cmap='summer', value=False, zero_start=False)
    ax.set_title('第%d层对输入空间线性变化' % (l_num))
    # 输入空间在进行非线性变换后在后层空间中的情况
    ax = scatter(actV, layerNumCode2, actX, cmap='summer', value=False, zero_start=False)
    ax.set_title('第%d层对输入空间激活后编码情况' % (l_num))
    # 第k层对原始特征空间的总体划分(叠加?)
    ax = scatter(p_0, layerNumCode, X, value=False,zero_start=False,cmap='coolwarm')
    ax.set_title('第%d层对原始特征空间的总体划分' % (l_num))
    # 前k层对原始特征空间胞腔分解情况(叠加?)
    ax = scatter(p_0, layersNumCode, X, value=False, zero_start=False,cmap=cmap)
    ax.set_title('前%d层对原始特征空间的总体划分' % (l_num))

    inX = actX
    inV = actV
    plt.show();
D:\anaconda\lib\site-packages\ipykernel_launcher.py:5: RuntimeWarning: More than 20 figures have been opened. Figures created through the pyplot interface (`matplotlib.pyplot.figure`) are retained until explicitly closed and may consume too much memory. (To control this warning, see the rcParam `figure.max_open_warning`).
  """