classSimpleDT(): defdt_train(self,dataset,D): #D:样本的权值分布 n_features=len(dataset.iloc[0,:])-1#特征数 y=dataset.iloc[:,-1]#类别标签 min_error=np.inf#初始化最小误差率 best_weak_calssifier={}#弱分类器 best_result=None#最好分类器的分类结果,没啥实际用处,方便debug best_error_inds=None#最好分类器的出错样本下标,没啥实际用处,方便debug #尝试每一个特征current_fea for fea_index inrange(n_features): #首先获取备选阈值 current_fea=dataset.iloc[:,fea_index]#当前特征 min_value,max_value=min(current_fea),max(current_fea) n_threshs=10 step=(max_value-min_value)/n_threshs threshs=[min_value]#存储备选阈值 temp=min_value for i inrange(n_threshs): threshs.append(temp+step) temp+=step #print(threshs)#此时全部备选阈值存储在threshs中
#尝试每一个备选阈值thresh for thresh in threshs: #尝试>或< for sign in ['>','<']: #获取当前弱分类器的分类结果 result=self.cal_result(dataset,fea_index,thresh,sign) print('使用特征{},阈值:{},符号:{},预测类别:{}'.format(fea_index,thresh,sign,result)) #接下来计算当前误差率e_m error_inds=[]#预测错误的样本的下标 for ind,pred inenumerate(result): if pred!=y[ind]: error_inds.append(ind) e_m=0 for index in error_inds: e_m+=D[index] #更新最小误差率 if e_m<min_error: min_error=e_m best_weak_calssifier['fea_index']=fea_index best_weak_calssifier['thresh']=thresh best_weak_calssifier['sign']=sign best_result=result best_error_inds=error_inds print('当前弱分类器训练完成!\n 划分特征为第{}个特征,阈值为{},不等号选择{}\n当前最好的分类误差率为{},出错的样本下标为{}\n预测结果为:{}'\ .format(best_weak_calssifier['fea_index'],best_weak_calssifier['thresh'],\ best_weak_calssifier['sign'],min_error,best_error_inds,best_result)) #返回 最小误差率,最好的弱分类器(含:划分特征下标,阈值,不等号),最好的预测结果 return min_error[0],best_weak_calssifier,best_result
#计算分类结果 defcal_result(self,data_X,fea_index,thresh,sign): fea=data_X.iloc[:,fea_index]#使用的特征 result=[]#预测结果 #对于每一个样本 for i inrange(len(data_X.iloc[:,0])): fea_value=data_X.iloc[i,fea_index]#第i个样本的第fea_index个特征取值 if sign =='>': if fea_value>=thresh: result.append(1) else: result.append(-1) else: if fea_value<=thresh: result.append(1) else: result.append(-1) return result
classAdaBoost(SimpleDT): defadab_train(self,dataset,n_iterations): n_samples=len(dataset.iloc[:,0])#样本量 y=dataset.iloc[:,-1]#真实类别标签 weak_classifiers=[]#存储弱分类器 total_cusum=np.zeros((n_samples,1))#存储各弱分类器加权预测值的累计结果 D=np.ones((n_samples,1))/n_samples#初始化训练样本的权值分布 for it inrange(n_iterations): #获取当前弱分类器及其对应的误差率,预测结果,对应(1)(2) error,weak_classifier,pred=self.dt_train(dataset,D) #丢弃误差率大于0.5的弱分类器 if error<=0.5: #计算系数,对应(3) alpha=0.5*np.log((1-error)/error) weak_classifier['alpha']=alpha #将当前弱分类器添加到弱分类器序列 weak_classifiers.append(weak_classifier) #更新权值分布,对应(4) z=0#(4)中公式的分母部分 temp=[] for i inrange(n_samples): temp.append(D[i]*np.exp(-alpha*y[i]*pred[i])) z+=D[i]*np.exp(-alpha*y[i]*pred[i]) for i,item inenumerate(temp): temp[i]=item/z D=temp#更新后的权值分布D
#联结现有的所有弱分类器==>终极分类器,并计算累计的预测值total_cusum pred=np.array(pred).reshape(-1,1) total_cusum+=alpha*pred #print(total_cusum)#shape:(n_samples,1) #应用符号函数求终极分类器的预测结果 final_pred=[] for i in total_cusum: if i >=0: final_pred.append(1) else: final_pred.append(-1) #计算当前终极分类器的误差 err=0 for i,pred inenumerate(final_pred): if pred!=y[i]: err+=1 error_rate=err/n_samples#误差 print('当前分类器由{}个弱分类器联结而成,当前分类错误率:{}\n'.format(len(weak_classifiers),error_rate)) #弱误差已经为0,则提前终止 if error_rate ==0 : print('已全部分类正确,退出...') break #返回全部弱分类器的序列 return weak_classifiers #对新的样本x进行分类 defmake_classification(self,test_data,weak_classifiers): n_samples=len(test_data.iloc[:,0]) total_cusum=np.zeros((n_samples,1))#存储各弱分类器加权预测值的累计结果 n_classifiers=len(weak_classifiers)#弱分类器的个数 for i inrange(n_classifiers): pred=self.cal_result(test_data,weak_classifiers[i]['fea_index'],weak_classifiers[i]['thresh'],weak_classifiers[i]['sign']) pred=np.array(pred).reshape(-1,1) total_cusum+=weak_classifiers[i]['alpha']*pred #应用符号函数求终极分类器的预测结果 final_pred=[] for i in total_cusum: if i >=0: final_pred.append(1) else: final_pred.append(-1) return final_pred
make_classification用于对传入的待分类数据test_data进行分类,这其实就是我们最终需要的强分类器。该方法用adab_train返回的弱分类器序列中的每一个弱分类器对test_data进行分类,并对其分类结果(1 or -1)进行加权,最后将这些加权后的预测值累加起来,通过符号函数的映射,就得到了强分类器的预测结果。
import pandas as pd import numpy as np classSimpleDT(): defdt_train(self,dataset,D): #D:样本的权值分布 n_features=len(dataset.iloc[0,:])-1#特征数 y=dataset.iloc[:,-1]#类别标签 min_error=np.inf#初始化最小误差率 best_weak_calssifier={}#弱分类器 best_result=None#最好分类器的分类结果,没啥实际用处,方便debug best_error_inds=None#最好分类器的出错样本下标,没啥实际用处,方便debug #尝试每一个特征current_fea for fea_index inrange(n_features): #首先获取备选阈值 current_fea=dataset.iloc[:,fea_index]#当前特征 min_value,max_value=min(current_fea),max(current_fea) n_threshs=10 step=(max_value-min_value)/n_threshs threshs=[min_value]#存储备选阈值 temp=min_value for i inrange(n_threshs): threshs.append(temp+step) temp+=step #print(threshs)#此时全部备选阈值存储在threshs中
#尝试每一个备选阈值thresh for thresh in threshs: #尝试>或< for sign in ['>','<']: #获取当前弱分类器的分类结果 result=self.cal_result(dataset,fea_index,thresh,sign) #print('使用特征{},阈值:{},符号:{},预测类别:{}'.format(fea_index,thresh,sign,result)) #接下来计算当前误差率e_m error_inds=[]#预测错误的样本的下标 for ind,pred inenumerate(result): if pred!=y[ind]: error_inds.append(ind) e_m=0 for index in error_inds: e_m+=D[index] #更新最小误差率 if e_m<min_error: min_error=e_m best_weak_calssifier['fea_index']=fea_index best_weak_calssifier['thresh']=thresh best_weak_calssifier['sign']=sign best_result=result best_error_inds=error_inds """ print('当前弱分类器训练完成!\n 划分特征为第{}个特征,阈值为{},不等号选择{}\n当前最好的分类误差率为{},出错的样本下标为{}\n预测结果为:{}'\ .format(best_weak_calssifier['fea_index'],best_weak_calssifier['thresh'],\ best_weak_calssifier['sign'],min_error,best_error_inds,best_result)) """ #返回 最小误差率,最好的弱分类器(含:划分特征下标,阈值,不等号),最好的预测结果 return min_error[0],best_weak_calssifier,best_result
#计算分类结果 defcal_result(self,data_X,fea_index,thresh,sign): fea=data_X.iloc[:,fea_index]#使用的特征 result=[]#预测结果 #对于每一个样本 for i inrange(len(data_X.iloc[:,0])): fea_value=data_X.iloc[i,fea_index]#第i个样本的第fea_index个特征取值 if sign =='>': if fea_value>=thresh: result.append(1) else: result.append(-1) else: if fea_value<=thresh: result.append(1) else: result.append(-1) return result classAdaBoost(SimpleDT): defadab_train(self,dataset,n_iterations): n_samples=len(dataset.iloc[:,0])#样本量 y=dataset.iloc[:,-1]#真实类别标签 weak_classifiers=[]#存储弱分类器 total_cusum=np.zeros((n_samples,1))#存储各弱分类器加权预测值的累计结果 D=np.ones((n_samples,1))/n_samples#初始化训练样本的权值分布 for it inrange(n_iterations): #获取当前弱分类器及其对应的误差率,预测结果,对应(1)(2) error,weak_classifier,pred=self.dt_train(dataset,D) #丢弃误差率大于0.5的弱分类器 if error<=0.5: #计算系数,对应(3) alpha=0.5*np.log((1-error)/error) weak_classifier['alpha']=alpha #将当前弱分类器添加到弱分类器序列 weak_classifiers.append(weak_classifier) #更新权值分布,对应(4) z=0#(4)中公式的分母部分 temp=[] for i inrange(n_samples): temp.append(D[i]*np.exp(-alpha*y[i]*pred[i])) z+=D[i]*np.exp(-alpha*y[i]*pred[i]) for i,item inenumerate(temp): temp[i]=item/z D=temp#更新后的权值分布D
#联结现有的所有弱分类器==>终极分类器,并计算累计的预测值total_cusum pred=np.array(pred).reshape(-1,1) total_cusum+=alpha*pred #print(total_cusum)#shape:(n_samples,1) #应用符号函数求终极分类器的预测结果 final_pred=[] for i in total_cusum: if i >=0: final_pred.append(1) else: final_pred.append(-1) #计算当前终极分类器的误差 err=0 for i,pred inenumerate(final_pred): if pred!=y[i]: err+=1 error_rate=err/n_samples#误差 print('当前分类器由{}个弱分类器联结而成,当前分类错误率:{}\n'.format(len(weak_classifiers),error_rate)) #弱误差已经为0,则提前终止 if error_rate ==0 : print('已全部分类正确,退出...') break #返回全部弱分类器的序列 return weak_classifiers #对新的样本x进行分类 defmake_classification(self,test_data,weak_classifiers): n_samples=len(test_data.iloc[:,0]) total_cusum=np.zeros((n_samples,1))#存储各弱分类器加权预测值的累计结果 n_classifiers=len(weak_classifiers)#弱分类器的个数 for i inrange(n_classifiers): pred=self.cal_result(test_data,weak_classifiers[i]['fea_index'],weak_classifiers[i]['thresh'],weak_classifiers[i]['sign']) pred=np.array(pred).reshape(-1,1) total_cusum+=weak_classifiers[i]['alpha']*pred #应用符号函数求终极分类器的预测结果 final_pred=[] for i in total_cusum: if i >=0: final_pred.append(1) else: final_pred.append(-1) return final_pred