基本理论
什么是OCR
- OCR (Optical Character Recognition,光学字符识别)是指对图片中的文字进行查找、提取、识别的一种技术,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程。
一般步骤
- 文字检测:解决的问题是哪里有文字,文字的范围有多少
- 文字识别:对定位好的文字区域进行识别,主要解决的问题是每个文字是什么,将图像中的文字区域进转化为字符信息。
OCR的难点
- 文本检测,尤其是复杂自然场景下的文本检测,非常具有挑战性,主要难点有:
- 文本存在多种分布,文本排布形式多样;
- 文本大小、长度不固定;
- 文本存在多个方向;
- 多种语言混合;
- 在实际项目中,检测精度要求极高。
OCR与目标检测的区别
- 文本大多数以长矩形形式存在,即长宽比一般较大或较小,这与普通的目标检测中的物体不一样(这些长宽比较接近1)
- 普通物体(比如猫)存在明显的闭合边缘轮廓,而文本没有
- 文本中包含多个文字,而文字之间是有间隔的,如果检测做得不好,我们就会把每个字都当成文本行给框出来而非整行作为文本框,这与我们的期望不一样
评估指标
- 拒识率:把应该识别的文字,当成不能识别的文字
- 误识率:不应该作为文字的作为文字来识别
- 识别速度:一般可接受范围在50~500ms
- 稳定性:识别结果稳定性
应用
- 文档/书籍扫描、车牌识别、证件识别、卡识别、票据识别
- 教育场景文字识别(例如拍照搜题)
- 文字识别笔
- 旅游翻译APP
- 盲人相机
- 自动导航
常用数据集
ICDAR
ICDAR是进行场景文本检测和识别(Scene Text Detection and Recognition)任务最知名和常用的数据集。ICDAR 2017 RCTW,由Baoguang Shi等学者提出。RCTW主要是中文,共12263张图像,其中8034作为训练集,4229作为测试集。使用四边形框标注文本行。数据集绝大多数是相机拍的自然场景,一些是屏幕截图;包含了大多数场景,如室外街道、室内场景、手机截图等等。官网链接:http://rctw.vlrlab.net/dataset/
- 分辨率。图像分辨率大小不等,小则300+,大则3000+;图像清晰程度不一,绝大多数背景和文字很清晰,极少数模糊
场景
- 街道场景:主要是建筑、标志牌、条幅等带有文字的图像,这类场景占据大多数
- 截图:主要是网络上带有文字的图像截图、手机上带有文字的图像截图、带文字的表情包图像
- 室内:主要是商场、墙壁等带有文字的图像
- 证件类:主要是车牌照、驾驶证、身份证等带有文字的图像
文本
- 方向(multi-oriented):绝大多数是水平方向,垂直、倾斜均有,弯曲方向极少数
- 大小(multi-scale):由于图像拍照远近,图像有近距离或远距离的,因此文本大小不等,有较多的长文
- 质量:少数文本存在模糊、光照不均匀、低分辨率等情况
- 字体:绝大多数字体为楷书,极少数艺术字,几乎没有手写字体
- 遮挡:绝大多数文本没有遮挡,极少数有遮挡
标注:数据集的标注保存在
<image_nam>.txt
文件中,其中格式如下:
x1,y1,x2,y2,x3,y3,x4,y4,<识别难易程度>,<"文本">
x1,y1,x2,y2,x3,y3,x4,y4,<识别难易程度>,<"文本">
x1,y1,x2,y2,x3,y3,x4,y4,<识别难易程度>,<"文本">
- 八个坐标点分别是:左上、右上、右下、左下。示例图片如下:
CTW
CTW(Chinese Text in the Wild,自然场景下中文字符数据集)是腾讯和清华大学一起制作的数据集,包含32,285 张高分辨率图像,1,018,402 个字符实例,3,850 种字符种类,6 种属性
"occluded(被遮挡的)" | "bgcomplex(复杂背景)" | "distorted(扭曲)" | "raised(3D浮雕)" | "wordart(艺术字)" | "handwritten(手写体)
- 数据集划分。训练集75%,验证集5%,分类测试集10%,检测测试集10%
- 主页:https://ctwdataset.github.io/
- 论文:A Large Chinese Text Dataset in the Wild
- 示例图片
MSRA-TD
MSRA-TD是一个小型、支持中英文的倾斜文本检测的数据集,包含500幅自然图像,使用袖珍相机从室内(办公室和商场)和室外(街道)场景中拍摄。室内图像主要是标志牌、门牌和警示牌,室外图像主要是复杂背景下的引导牌和广告牌。图像的分辨率从1296x864到1920x1280不等。
- 数据集分为两部分:训练集和测试集。训练集包含从原始数据集中随机选择的300幅图像,其余200幅图像构成测试集。该数据集中的所有图像都进行了完全注释。该数据集中的基本单位是文本行,而不是ICDAR数据集中使用的单词,因为很难根据间距将中文文本行划分为单个单词;即使对于英文文本行,在没有高级信息的情况下执行单词分割也是非常重要的。以下是示例图片与标注方式:
文字检测技术
CTPN(2016)
概述
- CTPN全称Detecting Text in Natural Image with Connectionist Text Proposal Network(基于连接文本提议网络的自然图像文本检测),是发表于2016年的用于OCR的一篇著名论文。直到今天这个网络框架一直是OCR系统中做文本检测的一个常用网络,极大地影响了后面文本检测算法的方向。该模型在自然环境下水平文字的检测方面有着良好的表现。其基本思想是先使用固定宽度(16像素)的小文本框对图像进行检测,得到一系列含有文字的区域,然后对这些区域进行合并,合并成大的、完整的文本框。
具体步骤
- CTPN主要包含以下几个步骤:
- 检测文本。使用固定宽度为16像素的小区域(proposal)在原图像上移动检测,每个proposal使用10个锚点高度在11~273之间(每次除以0.7)。检测器在每个窗口位置输出k个锚点的文本/非文本分数和预测的y轴坐标(v);
- 左:RPN提议。右:细粒度的文本提议。
- 利用RNN连接多个proposal。检测出文本区域后,将这些小的文本区域进行连接。为了避免对与文本模式类似的非文本目标(窗口,砖块,树叶等)的误检,使用了双向LSTM(LSTM是RNN变种)利用前后两个方向上的信息对proposal进行连接。引入RNN进行连接操作,大大减少了错误检测,同时还能够恢复很多包含非常弱的文本信息的遗漏文本proposal;
- 边沿细化。完成连接后,对边沿进行细化处理,当两个水平边的proposal没有完全被实际文本行区域覆盖,或者某些边的提议被丢弃。通过连接其文本/非文本分数为>0.7的连续文本提议,文本行的构建非常简单。
- 文本行构建如下:首先,我们为提议\(B_j\)定义一个配对邻居作\(B_i\)为\(B_j−>B_i\),当(i)是最接\(B_j\)近\(B_i\)的水平距离,(ii)该距离小于50像素,并且(iii)它们的垂直重叠是>0.7时。其次,如果\(B_j−>B_i\)和\(B_i−>B_j\),则将两个提议分组为一对。然后通过顺序连接具有相同提议的对来构建文本行;
- CTPN检测有(红色框)和没有(黄色虚线框)边缘细化。细粒度提议边界框的颜色表示文本/非文本分数。
网络结构
- VGG16+Conv5:CTPN的基础网络使用了VGG16用于特征提取,在VGG的最后一个卷积层Conv5,CTPN用了3×3的卷积核来对该feature map做卷积,这个Conv5 特征图的尺寸由输入图像来决定,而卷积时的步长却限定为16,感受野被固定为228个像素;
- 卷积后的特征将送入BLSTM继续学习,最后接上一层全连接层FC输出我们要预测的参数:2K个纵向坐标y,2k个分数,k个x的水平偏移量。
损失函数
- CTPN有三个输出共同连接到最后的FC层,这三个输出同时预测文本/非文本分数
s
,垂直坐标\(v=\lbrace v_c,v_h \rbrace\)和边缘细化偏移o
。损失函数形式为:
- 其中每个锚点都是一个训练样本,i是一个小批量数据中一个锚点的索引。\(s_i\)是预测的锚点i作为实际文本的预测概率。\(s_i^*= \lbrace 0,1 \rbrace\)是真实值。j是y坐标回归中有效锚点集合中锚点的索引,定义如下。有效的锚点是定义的正锚点\(s_j^*=1\),如下所述,或者与实际文本提议重叠的交并比(IoU)>0.5。\(v_j\)和\(v_j^*\)是与第j个锚点关联的预测的和真实的y坐标。k是边缘锚点的索引,其被定义为在实际文本行边界框的左侧或右侧水平距离(例如32个像素)内的一组锚点。\(o_k\)和\(o_k^*\)是与第k个锚点关联的x轴的预测和实际偏移量\(L^{cl}_s\)是我们使用Softmax损失区分文本和非文本的分类损失。\(L^{re}_v\)和\(L^{re}_o\)是回归损失。\(\lambda_1\)和\(\lambda_2\)是损失权重,用来平衡不同的任务,将它们经验地设置为1.0和2.0。\(N_s,N_v,N_o\)是标准化参数,表示\(L^{cl}_s\),\(L^{re}_v\),\(L^{re}_o\)分别使用的锚点总数。
时间性能
- 使用单个GPU,CTPN(用于整个检测处理)的执行时间为每张图像大约0.14s。没有RNN连接的CTPN每张图像GPU时间大约需要0.13s。因此,所提出的网内循环机制稍微增加了模型计算,并获得了相当大的性能增益。
准确率
- CTPN在自然环境下的文字检测中取得了优异的效果。如下图所示:
- CTPN在五个基准数据集上进行了全面评估。在ICDAR 2013上,它的性能优于最近的TextFlow和FASText,将F-measure从0.80提高到了0.88。精确度和召回率都有显著提高,改进分别超过+5%和+7%。CTPN在检测小文本方面也有较好表现。在多个数据集下评估效果如下表所示:
缺陷
- 针对极小尺度文本检测有遗漏。如下图所示:
- 在极小尺度的情况下(红色框内)CTPN检测结果,其中一些真实边界框被遗漏。黄色边界箱是真实值。
- 对于非水平的文本的检测效果并不好。
SegLink(2017)
概述
- 对于普通目标检测,我们并不需要对其做所谓的多方向目标检测。但文本检测任务则不一样,文本的特点就是高宽比特别大或特别小,而且文本通常存在一定的旋转角度,如果我们对于带角度的文本仍然使用通用目标检测思路,通过四个参数(x,y,w,h)来指定一个目标的位置(如下图红色框),显然误差比较大,而绿色框才是理想的检测效果。那如何才能实现带角度的文本检测呢?让模型再学习一个表示角度的参数θ,即模型要回归的参数从原来的(x,y,w,h)变成(x,y,w,h,θ)。
- Seglink是一种多方向文本检测方法,该方法既融入CTPN小尺度候选框的思路,又加入了SSD算法的思路,达到了自然场景下文本检测较好的效果。Seglink核心是将文本检测转换成两个局部元素的检测:segment和link。segment 是一个有方向的box,覆盖文本内容的一部分,而link则连接了两个相邻的segments,表达了这两个segment是否属于同一个文本。该算法通过在多尺度上进行segment和link的检测,最终按照links的表达将相关的segment合并成最终的bounding box。如下图所示。
网络结构
- 网络使用预先训练的VGG-16网络作为主干(从conv1到pool5)。之后,VGG-16的全连接层被替换为卷积层(fc6替换为conv6;fc7替换为conv7)。接着是一些额外的卷积层(conv8_1到conv11),用于进行多尺度检测。结构如下图所示。
- 检测到的定向框称为Segment,用\(s=(x_s,y_s,w_s,h_s,θ_s)\)表示。预测器产生7个通道segment检测。其中,2个通道用来判断有没有文本(分类),其余5个用来计算定向框的几何偏移(回归)。
link(链接)
- 在检测到segment之后,会进行link,将segment合在一起。
- 层内链接(with-in layer link):每个segment检测与其统一特征层周围的8个segment是否同属于一个字,如果属于则链接在一起。
- 跨层链接(cross layer link):跨层link使用相邻索引连接两个特征图层上的segment。
- 层内链接和跨层链接示意图如下图所示:
预测参数表示
- 预测器针对每个feature map输出参数总数为(2+5+16+8=31)。假设当前的feature map的尺度为(w,h),那么该层卷积后输出为w×h×31。这些参数包括:
- 每个segment内的分类分数,即判断框内有字符还是无字符的分数(2分类),共2个参数;
- segment的位置信息\(x,y,w,h,θ\),共5个参数;
- 同层(within-layer)的每个segment的link的分数,表示该方向有link还是没link(2分类问题),而一个segment有八邻域所以有八个方向,参数一共有2×8=16;
- 相邻层(cross-layer)之间也存在link,同样是该方向有link还是没link(2分类问题),而link的个数是4个,所以参数总数为2×4=8。如下图所示:
和并segment和link
- 网络会生成许多segment和link(数量取决于图像大小),需要将这些segment和link进行合并。合并之前,先根据置信度得分进行过滤。以过滤后的segment为节点,过滤后的link为边,在其上构造一个图。合并算法如下表所示:
合并算法
- 设有一个集合B,里面有很多相关联的segment待合并;
- 每一个segment都有角度θ,求集合B中所有segment角度的平均值\(θ_b\);
- 求一条直线L,使得所有segment的中心到这条直线的距离最小(最小二乘法线性回归);
- 每个segment的中心向直线L做垂直投影;
- 从所有投影点中选出相距最远的两个点,记做\(x_p,y_p\)和\(x_q,y_q\);
- 最终合并好的文本框的位置参数记为\(x_b,y_b,w_b,h_b,θ_b\),则
$$ x_b=\frac{x_p + x_q}{2} \ y_b=\frac{y_q + y_q}{2} $$
- 文本行的宽度\(w_b\)就是两个最远点的距离,即\(x_p,y_p\)和\(x_q,y_q\)再加上最远两个点所处的segment的宽度的一半\(W_p\)和\(W_q\);
- 文本行高度\(h_b\)就是所有segment高度求平均值。
- 如下图所示,橙色直线是拟合出的最佳直线,红色点表示segment的中心,黄点表示红点在直线上的投影,绿框就是合并后的完整本文框:
损失函数
- SegLink所使用的损失函数由三个部分构成,是否是text的二分类的softmax损失,box的smooth L1 regression损失,是否link的二类的softmax损失。\(\lambda_1\)和\(\lambda_2\)是损失权重控制权重,最后都设为1。
性能
- 英语单语文本检测
- 英语单语文本检测效果明显好于其它模型。如下表:
- 即使在杂乱的背景下也有较好的表现。如图:
- 多种语言文本检测
- SegLink在多语种场景检测中准确率、速度都有较好表现。如下表所示:
局限
- 水平文字检测效果不及CTPN
- 无法检测到字符间距非常大的文本和弯曲文本
DB(2020)
- DB全称是Differentiable Binarization(可微分二值化),是近年提出的利用图像分割方法进行文字检测的模型。前文所提到的模型,使用一个水平矩形框或带角度的矩形框对文字进行定位,这种定位方式无法应用于弯曲文字和不规范分布文字的检测。DB模型利用图像分割方法,预测出每个像素的类别(是文字/不是文字),可以用于任意形状的文字检测。如下图所示:
- 左图:原图;右图:检测结果,红色部分为预测成文字的像素区域,蓝色为非文字像素区域
基本流程
DB之前的一些基于图像分割的文字检测模型,识别原理如上图蓝色箭头所标记流程:
- 第一步,对原图进行分割,预测出每个像素的属于文本/非文本区域的概率;
- 第二步,根据第一步生成的概率,和某个固定阈值进行比较,产生一个二值化图;
- 第三步,采用一些启发式技术(例如像素聚类)将像素分组为文本示例。
DB模型的流程如上图红色箭头所示流程:
- 第一步,对原图进行分割,预测出每个像素的属于文本/非文本区域的概率。同时,预测一个threshold map(阈值图)
- 第二步,采用第一步预测的概率和预测的阈值进行比较(不是直接和阈值比较,而是通过构建一个公式进行计算),根据计算结果,得到二值化图。在计算二值化图过程中,采用了一种二值化的近似函数,称为可微分二值化(Differentiable Binarization),在训练过程中,该函数完全可微分;
- 第三步,根据二值化结果生成分割结果。
标签值生成
- 对于每个经过原始标记的样本(上图中第一张图像),采用Vatti clipping algorithm算法(一种用于计算多边形裁剪的算法)对多边形进行缩放,得到缩放后的多边形作为文字边沿(如上图中第二张图像绿色、蓝色多边形所示)。计算公式:
$$ D = \frac{A(1 - r^2)}{L} $$
- 其中,D是收缩放量,A为多边形面积,L为多边形周长,r是缩放系数,设置为0.4. 根据计算出的偏移量D进行缩小,得到缩小的多边形(第二张图像蓝色边沿所示);根据偏移量D放大,得到放大的多边形(第二张图像绿色边沿所示),两个边沿间的部分就是文字边界。
模型结构
- Differentiable Binarization模型结构如下图所示:
- 模型经过卷积,得到不同降采样比率的特征图,经过特征融合后,产生一组分割概率图、一组阈值预测图,然后微分二值化算法做近似二值化处理,得到预测二值化图。传统的二值化方法一般采用阈值分割法,计算公式为:
$$ B_{i, j} = \begin{cases} 1,\quad if \ P_{i,j} \ge t \ 0, \quad otherwise \end{cases} \tag{1} $$
- 上式描述的二值化方法是不可微分的,导致在训练期间无法与分割网络部分一起优化,为了解决这个问题,DB模型采用了近似阶跃函数的、可微分二值化函数。函数定义如下:
$$ \hat B_{i, j} = \frac{1}{1+e^{-k(P_{i,j} - T_{i, j})}} $$
- 其中,\(P{i,j}\)表示预测概率,\(T{i, j}\)表示阈值,两个值相减后经过系数\(K\)放大,当预测概率越大于阈值,则输出值越逼近1。
- 标准二值化函数与可微分二值化函数比较。SB:standard binarization其梯度在0值被截断无法进行有效地回传。DB:differentiable binarization是一个可微分的曲线
# 可谓分二值化函数示例
import math
P1 = 0.6 # 预测概率1
P2 = 0.4 # 预测概率2
T = 0.5 # 阈值
K = 50
B1 = 1.0 / (1 + pow(math.e, -K * (P1 - T)))
print("B1:", B1) # B1:0.9933 趋近于1
B2 = 1.0 / (1 + pow(math.e, -K * (P2 - T)))
print("B2:", B2) # B2:0.00669 趋近于0
损失函数
- DB模型损失函数如下所示:
$$ L = L_s + \alpha \times L_b + \beta \times L_t $$
- 其中,\(L_s\)是预测概率图的loss部分,\(L_b\)是二值图的loss部分,\(\alpha\)和\(\beta\)值分别设置为1和10.\(L_s\)和\(L_b\)均采用二值交叉熵:
$$ L_s = L_b = \sum_{i \in S_l} y_i log x_i + (1 - y_i) log(1-x_i) $$
- 上式中\(S_l\)是样本集合,正负样本比例为1:3.
- \(Lt\)指经过膨胀后的多边形区域中的像素预测结果和标签值之间的\(L1\)距离之和:
$$ L_t = \sum_{i \in R_d} |y_i ^* - x_i ^*| $$
- \(R_d\)值膨胀区域\(G_d\)内的像素索引, \(y_i^*\)是阈值图的标签值。
涉及到的数据集
- 模型在以下6个数据集下进行了实验:
- SynthText:合成数据集,包含80万张图像,用于模型训练
- MLT-2017:多语言数据集,包含9种语言,7200张训练图像,1800张验证图像及9000张测试图像,用于模型微调
- ICDAR 2015:包含1000幅训练图像和500幅测试图像,分辨率720*1280,提供了单词级别标记
- MSRA-TD500:包含中英文的多语言数据集,300张训练图像及200张测试图像
- CTW1500:专门用于弯曲文本的数据集,1000个训练图像和500个测试图像,文本行级别标记
- Total-Text:包含各种形状的文本,及水平、多方向和弯曲文字,1255个训练图像和300个测试图像,单词级别标记
- 为了扩充数据量,论文采用了随机旋转(-10°~10°角度内)、随机裁剪、随机翻转等策略进行数据增强。
- 对各种形状的文本实例的一些可视化结果,包括弯曲文本、多向文本、垂直文本和长文本行。对于每个单元,右上角是阈值映射;右下角是概率图。
效果
- 不同设置结果比较,“DConv”表示可变形卷积。“P”、“R”和“F”分别表示精度、召回率和F度量。
- Total-Text数据集下测试结果,括号中的值表示输入图像的高度,“*”表示使用多尺度进行测试,“MTS”和“PSE”是Mask TextSpotter和PSENet的缩写
- CTW1500数据集下测试结果。括号中的值表示输入图像的高度。
- ICDAR 2015数据集下测试结果。括号中的值表示输入图像的高度,“TB”和“PSE”是TextBoxes++和PSENet的缩写。
- MSRA-TD500数据集下测试结果。括号中的值表示输入图像的高度。
- MLT-2017数据集下测试结果。“PSE”是PSENet的缩写。
结论
- 能有效检测弯曲文本、不规范分布文本
- 具有较好的精度和速度
- 局限:不能处理文本中包含文本的情况
文字识别技术
CRNN+CTC(2015)
- CRNN(Convolutional Recurrent Neural Network)即卷积递归神经网络,是DCNN和RNN的组合,专门用于识别图像中的序列式对象。与CTC loss配合使用,进行文字识别,可以直接从文本词级或行级的标注中学习,不需要详细的字符级的标注。
特点
- 与大多数现有的组件需要单独训练和协调的算法相比,它是端对端训练的。
- 它自然地处理任意长度的序列,不涉及字符分割或水平尺度归一化。
- 它不仅限于任何预定义的词汇,并且在无词典和基于词典的场景文本识别任务中都取得了显著的表现。
- 它产生了一个有效而小得多的模型,这对于现实世界的应用场景更为实用。
网络结构
- 卷积层:从输入图像中提取特征序列;
- 循环层:预测每一帧的标签分布;
- 转录层:将每一帧的预测变为最终的标签序列。
- 网络架构。架构包括三部分:
- 卷积层,从输入图像中提取特征序列;
- 循环层,预测每一帧的标签分布;
- 转录层,将每一帧的预测变为最终的标签序列。
特征提取
- 在CRNN模型中,通过采用标准CNN模型(去除全连接层)中的卷积层和最大池化层来构造卷积层的组件。这样的组件用于从输入图像中提取序列特征表示。在进入网络之前,所有的图像需要缩放到相同的高度。然后从卷积层组件产生的特征图中提取特征向量序列,这些特征向量序列作为循环层的输入。具体地,特征序列的每一个特征向量在特征图上按列从左到右生成。这意味着第i个特征向量是所有特征图第i列的连接。在我们的设置中每列的宽度固定为单个像素。由于卷积层,最大池化层和元素激活函数在局部区域上执行,因此它们是平移不变的。因此,特征图的每列对应于原始图像的一个矩形区域(称为感受野),并且这些矩形区域与特征图上从左到右的相应列具有相同的顺序。如图2所示,特征序列中的每个向量关联一个感受野,并且可以被认为是该区域的图像描述符。
- 感受野。提取的特征序列中的每一个向量关联输入图像的一个感受野,可认为是该区域的特征向量。
序列标注
- 一个深度双向循环神经网络是建立在卷积层的顶部,作为循环层。循环层预测特征序列\(x = x_1,…,x_T\)中每一帧\(x_t\)的标签分布\(y_t\)。循环层的优点是三重的。首先,RNN具有很强的捕获序列内上下文信息的能力。对于基于图像的序列识别使用上下文提示比独立处理每个符号更稳定且更有帮助。以场景文本识别为例,宽字符可能需要一些连续的帧来完全描述(参见图2)。此外,一些模糊的字符在观察其上下文时更容易区分,例如,通过对比字符高度更容易识别“il”而不是分别识别它们中的每一个。其次,RNN可以将误差差值反向传播到其输入,即卷积层,从而允许我们在统一的网络中共同训练循环层和卷积层。第三,RNN能够从头到尾对任意长度的序列进行操作。
- 传统的RNN单元在其输入和输出层之间具有自连接的隐藏层。每次接收到序列中的帧\(x_t\)时,它将使用非线性函数来更新其内部状态\(h_t\),该非线性函数同时接收当前输入\(xt\)和过去状态\(h{t−1}\)作为其输入:\(h_t = g(xt, h{t−1})\)。那么预测\(y_t\)是基于\(ht\)的。以这种方式,过去的上下文\(\lbrace x{t\prime} \rbrace _{t \prime < t}\)被捕获并用于预测。然而,传统的RNN单元有梯度消失的问题,这限制了其可以存储的上下文范围,并给训练过程增加了负担。长短时记忆(LSTM)是一种专门设计用于解决这个问题的RNN单元。LSTM(图3所示)由一个存储单元和三个多重门组成,即输入,输出和遗忘门。在概念上,存储单元存储过去的上下文,并且输入和输出门允许单元长时间地存储上下文。同时,单元中的存储可以被遗忘门清除。LSTM的特殊设计允许它捕获长距离依赖,这经常发生在基于图像的序列中。
(a) 基本的LSTM单元的结构。LSTM包括单元模块和三个门,即输入门,输出门和遗忘门。 (b) 我们论文中使用的深度双向LSTM结构。合并前向(从左到右)和后向(从右到左)LSTM的结果到双向LSTM中。在深度双向LSTM中堆叠多个双向LSTM结果。
- LSTM是定向的,它只使用过去的上下文。然而,在基于图像的序列中,两个方向的上下文是相互有用且互补的。因此,将两个LSTM,一个向前和一个向后组合到一个双向LSTM中。此外,可以堆叠多个双向LSTM,得到如图3.b所示的深双向LSTM。深层结构允许比浅层抽象更高层次的抽象,并且在语音识别任务中取得了显著的性能改进。
转录
转录是将RNN所做的每帧预测转换成标签序列的过程。数学上,转录是根据每帧预测找到具有最高概率的标签序列。在实践中,存在两种转录模式,即无词典转录和基于词典的转录。词典是一组标签序列,预测受拼写检查字典约束。在无词典模式中,预测时没有任何词典。在基于词典的模式中,通过选择具有最高概率的标签序列进行预测。
标签序列的概率
- 采用”联接时间分类“(CTC)层中定义的条件概率。按照每帧预测\(y=y_1,…,y_T\)对标签序列\(l\)定义概率,并忽略\(l\)中每个标签所在的位置。因此,当我们使用这种概率的负对数似然作为训练网络的目标函数时,我们只需要图像及其相应的标签序列,避免了标注单个字符位置的劳动。
- 条件概率的公式简要描述如下:输入是序列\(y = y_1,…,y_T\),其中\(T\)是序列长度。这里,每个\(y_t \in\Re^{|{\cal L}’|}\)是在集合\({\cal L}’ = {\cal L} \cup\)上的概率分布,其中\({\cal L}\)包含了任务中的所有标签(例如,所有英文字符),以及由
-
表示的“空白”标签。序列到序列的映射函数\({\cal B}\)定义在序列\(\boldsymbol{\pi}\in{\cal L}’^{T}\)上,其中\(T\)是长度。\({\cal B}\)将\(\boldsymbol{\pi}\)映射到\(\mathbf{l}\)上,首先删除重复的标签,然后删除blank
。例如,\({\cal B}\)将“–hh-e-l-ll-oo–”(-
表示blank
)映射到“hello”。然后,条件概率被定义为由\({\cal B}\)映射到\(\mathbf{l}\)上的所有\(\boldsymbol{\pi}\)的概率之和:
$$ \begin{equation} p(\mathbf{l}|\mathbf{y})=\sum_{\boldsymbol{\pi}:{\cal B}(\boldsymbol{\pi})=\mathbf{l}}p(\boldsymbol{\pi}|\mathbf{y}) \end{equation} $$
- \(\boldsymbol{\pi}\)的概率定义为\(p(\boldsymbol{\pi}|\mathbf{y})=\prod_{t=1}^{T}y_{\pi_{t}}^{t}\),\(y_{\pi_{t}}^{t}\)是时刻\(t\)时有标签\(\pi_{t}\)的概率。由于存在指数级数量的求和项,直接计算方程1在计算上是不可行的。然而,使用CTC中描述的前向算法可以有效计算方程。
无字典转录
- 在这种模式下,将具有方程1中定义的最高概率的序列\(\mathbf{l}^{*}\)作为预测。由于不存在用于精确找到解的可行方法,我们采用CTC中的策略。序列\(\mathbf{l}^{*}\)通过\(\mathbf{l}^{*}\approx{\cal B}(\arg\max_{\boldsymbol{\pi}}p(\boldsymbol{\pi}|\mathbf{y}))\)近似发现,即在每个时间戳\(t\)采用最大概率的标签\(\pi_{t}\),并将结果序列映射到\(\mathbf{l}^{*}\)。
基于词典的转录
- 在基于字典的模式中,每个测试采样与词典\({\cal D}\)相关联。基本上,通过选择词典中具有方程1中定义的最高条件概率的序列来识别标签序列,即\(\mathbf{l}^{*}=\arg\max_{\mathbf{l}\in{\cal D}}p(\mathbf{l}|\mathbf{y})\)。然而,对于大型词典,例如5万个词的Hunspell拼写检查词典,对词典进行详尽的搜索是非常耗时的,即对词典中的所有序列计算方程1,并选择概率最高的一个。为了解决这个问题,我们观察到,通过无词典转录预测的标签序列通常在编辑距离度量下接近于实际结果。这表示我们可以将搜索限制在最近邻候选目标\({\cal N}_{\delta}(\mathbf{l}')\),其中\(\delta\)是最大编辑距离,\(\mathbf{l}'\)是在无词典模式下从\(\mathbf{y}\)转录的序列:
$$ \begin{equation} \mathbf{l}^{*}=\arg\max_{\mathbf{l}\in{\cal N}_{\delta}(\mathbf{l}')}p(\mathbf{l}|\mathbf{y}) \end{equation} $$
- 可以使用BK树数据结构有效地找到候选目标\({\cal N}_{\delta}(\mathbf{l}’)\),这是一种专门适用于离散度量空间的度量树。BK树的搜索时间复杂度为\(O(\log|{\cal D}|)\),其中\(|{\cal D}|\)是词典大小。因此,这个方案很容易扩展到非常大的词典。在我们的方法中,一个词典离线构造一个BK树。然后,我们使用树执行快速在线搜索,通过查找具有小于或等于\(\delta\)编辑距离来查询序列。
网络训练
- \({\cal X}= \lbrace I_i,\mathbf{l}_i \rbrace _i \)表示训练集,\(I_{i}\)是训练图像,\(\mathbf{l}_{i}\)是真实的标签序列。目标是最小化真实条件概率的负对数似然:
$$ \begin{equation} {\cal O}=-\sum_{I_{i},\mathbf{l}_{i}\in{\cal X}}\log p(\mathbf{l}_{i}|\mathbf{y}_{i}) \end{equation} $$
- \(\mathbf{y}_{i}\)是循环层和卷积层从\(I_{i}\)生成的序列。目标函数直接从图像和它的真实标签序列计算代价值。因此,网络可以在成对的图像和序列上进行端对端训练,去除了在训练图像中手动标记所有单独组件的过程。
- 网络使用随机梯度下降(SGD)进行训练。梯度由反向传播算法计算。特别地,在转录层中,误差使用前向算法进行反向传播。在循环层中,应用随时间反向传播(BPTT)来计算误差。
- 为了优化,使用ADADELTA自动计算每维的学习率。与传统的动量方法相比,ADADELTA不需要手动设置学习率。更重要的是,我们发现使用ADADELTA的优化收敛速度比动量方法快。
结论
- 该模型在4个公共测试数据集上取得了较好的成绩,跟其它基于深度学习模型相比,具有明显提升。如下表所示:
- IIIT5k,SVT,IC03,IC13表示4个数据集
- 50,1k,50k和Full表示使用的字典,None表示识别没有字典
- 识别图像中的乐谱被称为光学音乐识别(OMR)问题。在乐谱识别方面,CRNN大大优于两个商业系统。Capella Scan和PhotoScore系统在干净的数据集上表现相当不错,但是它们的性能在合成和现实世界数据方面显著下降。主要原因是它们依赖于强大的二值化来检五线谱和音符,但是由于光线不良,噪音破坏和杂乱的背景,二值化步骤经常会在合成数据和现实数据上失败。另一方面,CRNN使用对噪声和扭曲具有鲁棒性的卷积特征。此外,CRNN中的循环层可以利用乐谱中的上下文信息。每个音符不仅自身被识别,而且被附近的音符识别。因此,通过将一些音符与附近的音符进行比较可以识别它们,例如对比他们的垂直位置。
文字识别-CRNN
数据集
# 解压数据集
!cd /home/aistudio/data/data6927 && unzip -qo word-recognition.zip
# 解压预训练参数
!cd /home/aistudio/data/data6963 && unzip -qo pretrained-model.zip -d /home/aistudio
预处理数据
- 数据集格式为一行对应一个文字切片,切片格式为:
- 文件名\t标签 例如:
- 1_1.jpg xybLoveCharlotte
- 以下代码将数据拆分成训练集和验证集,生成训练用的 train.txt 和验证用的 eval.txt。并生成 label_list.txt,记录所有支持的字符
# 预处理数据,将其转化为标准格式。同时将数据拆分成两份,以便训练和计算预估准确率
import codecs
import os
import random
import shutil
from PIL import Image
train_ratio = 9 / 10 # 训练集大小
all_file_dir = "data/data6927/word-recognition" # 数据文件路径
image_path_pre = os.path.join(all_file_dir, "imageSet") # 路径
# 训练数据集路径
train_image_dir = os.path.join(all_file_dir, "trainImageSet")
if not os.path.exists(train_image_dir):
os.makedirs(train_image_dir)
# 评估数据集路径
eval_image_dir = os.path.join(all_file_dir, "evalImageSet")
if not os.path.exists(eval_image_dir):
os.makedirs(eval_image_dir)
# 训练集、评估集、标签文件
train_file = codecs.open(os.path.join(all_file_dir, "train.txt"), 'w')
eval_file = codecs.open(os.path.join(all_file_dir, "eval.txt"), 'w')
label_list = os.path.join(all_file_dir, "image_label.txt")
train_count = 0
eval_count = 0
class_set = set()
# 标注文件进行处理
with open(label_list) as f:
for line in f:
parts = line.strip().split()
file, label = parts[0], parts[1]
# 标点符号跳过
if '/' in label or '\'' in label or '.' in label or '!' in label or '-' in label or '$' in label or '&' in label or '@' in label or '?' in label or '%' in label or '(' in label or ')' in label or '~' in label:
continue
# 将标签中的文字加入集合
for e in label:
class_set.add(e)
# 分测试集、评估集
if random.uniform(0, 1) <= train_ratio:
shutil.copyfile(os.path.join(image_path_pre, file), os.path.join(train_image_dir, file))
train_file.write("{0}\t{1}\n".format(os.path.join(train_image_dir, file), label))
train_count += 1
else:
shutil.copyfile(os.path.join(image_path_pre, file), os.path.join(eval_image_dir, file))
eval_file.write("{0}\t{1}\n".format(os.path.join(eval_image_dir, file), label))
eval_count += 1
print("train image count: {0} eval image count: {1}".format(train_count, eval_count))
class_list = list(class_set)
class_list.sort()
print("class num: {0}".format(len(class_list)))
print(class_list)
with codecs.open(os.path.join(all_file_dir, "label_list.txt"), "w") as label_list:
label_id = 0
for c in class_list:
label_list.write("{0}\t{1}\n".format(c, label_id))
label_id += 1
模型搭建与训练
# -*- coding: UTF-8 -*-
"""
训练常基于crnn-ctc的网络,文字行识别
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import uuid
import numpy as np
import time
import six
import math
import random
import paddle
import paddle.fluid as fluid
import logging
import xml.etree.ElementTree
import codecs
import json
from paddle.fluid.initializer import MSRA
from paddle.fluid.param_attr import ParamAttr
from paddle.fluid.regularizer import L2Decay
from PIL import Image, ImageEnhance, ImageDraw
logger = None
train_params = {
"input_size": [1, 48, 512], # 输入数据维度
"data_dir": "data/data6927/word-recognition", # 数据集路径
"train_dir": "trainImageSet", # 训练数据目录
"eval_dir": "evalImageSet", # 评估数据目录
"train_list": "train.txt", # 训练集文件
"eval_list": "eval.txt", # 评估集文件
"label_list": "label_list.txt", # 标签文件
"class_dim": -1,
"label_dict": {}, # 标签字典
"image_count": -1,
"continue_train": True, #是否增量训练
"pretrained": True, # 预训练
"pretrained_model_dir": "./pretrained-model", # 预训练模型目录
"save_model_dir": "./crnn-model", # 模型保存目录
"num_epochs": 40, # 训练轮次
"train_batch_size": 256, # 训练批次大小
"use_gpu": True, # 是否使用gpu
"ignore_thresh": 0.7, # 阈值
"mean_color": 127.5, #
"mode": "train", # 模式
"multi_data_reader_count": 4, # reader数量
"apply_distort": True, # 是否进行扭曲
"image_distort_strategy": { # 扭曲策略
"expand_prob": 0.5, # 放大比率
"expand_max_ratio": 2, # 最大放大比率
"hue_prob": 0.5, # 色调
"hue_delta": 18,
"contrast_prob": 0.5, # 对比度
"contrast_delta": 0.5,
"saturation_prob": 0.5, # 饱和度
"saturation_delta": 0.5,
"brightness_prob": 0.5, # 亮度
"brightness_delta": 0.125
},
"rsm_strategy": { # 梯度下降配置
"learning_rate": 0.0005,
"lr_epochs": [70, 120, 170, 220, 270, 320], # 学习率衰减分段(6个数字分为7段)
"lr_decay": [1, 0.5, 0.1, 0.05, 0.01, 0.005, 0.001], # 每段采用的学习率,对应lr_epochs参数7段
},
"early_stop": { # 控制训练停止条件
"sample_frequency": 50,
"successive_limit": 5,
"min_instance_error": 0.1
}
}
# CRNN网络模型
#################### 自己写的代码这里开始 ###################
class CRNN(object):
def __init__(self,
num_classes, # 类别数量
label_dict): # 标签字典
self.outputs = None # 输出
self.label_dict = label_dict # 标签字典
self.num_classes = num_classes # 类别数量
def name(self):
return "crnn"
def conv_bn_pool(self, input, group, # 输入,组
out_ch, # 输入通道数
act="relu", # 激活函数
param=None, bias=None, # 参数、权重初始值
param_0=None, is_test=False,
pooling=True, # 是否执行池化
use_cudnn=False): # 是否对cuda加速
tmp = input
for i in six.moves.xrange(group):
# for i in range(group): # 也可以
# 卷积层
tmp = fluid.layers.conv2d(
input=tmp, # 输入
num_filters=out_ch[i], # 卷积核数量, 等于悴通道数
filter_size=3,
padding=1,
param_attr=param if param_0 is None else param_0,
act=None,
use_cudnn=use_cudnn)
# 批量归一化
tmp = fluid.layers.batch_norm(
input=tmp, # 前面卷基层输出作为输入
act=act, # 激活函数
param_attr=param, # 参数初始值
bias_attr=bias, # 偏置初始值
is_test=is_test) # 测试模型
# 根据传入的参数决定是否做池化操作
if pooling:
tmp = fluid.layers.pool2d(
input=tmp, # 前一层的输出作为输入
pool_size=2, # 池化区域
pool_type="max", # 池化类型
pool_stride=2, # 步长
use_cudnn=use_cudnn,
ceil_mode=True) # 输出高度计算公式
return tmp
# 包含4个卷积层操作
def ocr_convs(self, input,
regularizer=None, # 正则化
gradient_clip=None, # 梯度裁剪,防止梯度过大
is_test=False, use_cudnn=False):
b = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.0))
w0 = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.0005))
w1 = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.01))
tmp = input
# 第一组卷积池化
tmp = self.conv_bn_pool(tmp,
2, [16, 16], # 组数量及卷积核数量
param=w1,
bias=b,
param_0=w0,
is_test=is_test,
use_cudnn=use_cudnn)
# 第二组卷积池化
tmp = self.conv_bn_pool(tmp,
2, [32, 32], # 组数量及卷积核数量
param=w1,
bias=b,
is_test=is_test,
use_cudnn=use_cudnn)
# 第三组卷积池化
tmp = self.conv_bn_pool(tmp,
2, [64, 64], # 组数量及卷积核数量
param=w1,
bias=b,
is_test=is_test,
use_cudnn=use_cudnn)
# 第四组卷积池化
tmp = self.conv_bn_pool(tmp,
2, [128, 128], # 组数量及卷积核数量
param=w1,
bias=b,
is_test=is_test,
pooling=False, # 不做池化
use_cudnn=use_cudnn)
return tmp
# 组网
def net(self, images,
rnn_hidden_size=200, # 隐藏层输出值数量
regularizer=None, # 正则化
gradient_clip=None, # 梯度裁剪,防止梯度过大
is_test=False,
use_cudnn=True):
# 卷积池化
conv_features = self.ocr_convs(
images,
regularizer=regularizer,
gradient_clip=gradient_clip,
is_test=is_test,
use_cudnn=use_cudnn)
# 将特征图转为序列
sliced_feature = fluid.layers.im2sequence(
input=conv_features, # 卷积得到的特征图作为输入
stride=[1, 1],
# 卷积核大小(高度等于原高度,宽度1)
filter_size=[conv_features.shape[2], 1])
# 两个全连接层
para_attr = fluid.ParamAttr(
regularizer=regularizer, # 正则化
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
bias_attr = fluid.ParamAttr(
regularizer=regularizer, # 正则化
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
bias_attr_nobias = fluid.ParamAttr(
regularizer=regularizer, # 正则化
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
fc_1 = fluid.layers.fc(
input=sliced_feature, # 序列化处理的特征图
size=rnn_hidden_size * 3,
param_attr=para_attr,
bias_attr=bias_attr_nobias)
fc_2 = fluid.layers.fc(
input=sliced_feature, # 序列化处理的特征图
size=rnn_hidden_size * 3,
param_attr=para_attr,
bias_attr=bias_attr_nobias)
# 双向GRU(门控循环单元,LSTM变种, LSTM是RNN变种)
gru_foward = fluid.layers.dynamic_gru(
input=fc_1,
size=rnn_hidden_size,
param_attr=para_attr,
bias_attr=bias_attr,
candidate_activation="relu")
gru_backward = fluid.layers.dynamic_gru(
input=fc_2,
size=rnn_hidden_size,
is_reverse=True, # 反向循环神经网络
param_attr=para_attr,
bias_attr=bias_attr,
candidate_activation="relu")
# 输出层
w_attr = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
b_attr = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.0))
fc_out = fluid.layers.fc(
input=[gru_foward, gru_backward], # 双向RNN输出作为输入
size=self.num_classes + 1, # 输出类别
param_attr=w_attr,
bias_attr=b_attr)
self.outputs = fc_out
return fc_out
def get_infer(self):
# 将CRNN网络输出交给CTC层转录(纠错、去重)
return fluid.layers.ctc_greedy_decoder(
input=self.outputs, # 输入为CRNN网络输出
blank=self.num_classes)
################### 自己编写代码结束 ####################
def init_train_params():
"""
初始化训练参数,主要是初始化图片数量,类别数
:return:
"""
train_list = os.path.join(train_params['data_dir'], train_params['train_list'])
label_list = os.path.join(train_params['data_dir'], train_params['label_list'])
index = 0
with codecs.open(label_list, encoding='utf-8') as flist:
lines = [line.strip() for line in flist]
for line in lines:
parts = line.split()
train_params['label_dict'][parts[0]] = int(parts[1])
index += 1
train_params['class_dim'] = index
with codecs.open(train_list, encoding='utf-8') as flist:
lines = [line.strip() for line in flist]
train_params['image_count'] = len(lines)
# 初始化日志相关配置
def init_log_config():
global logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_path = os.path.join(os.getcwd(), 'logs')
if not os.path.exists(log_path):
os.makedirs(log_path)
log_name = os.path.join(log_path, 'train.log')
sh = logging.StreamHandler()
fh = logging.FileHandler(log_name, mode='w')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
sh.setFormatter(formatter)
logger.addHandler(sh)
logger.addHandler(fh)
# 重设图像大小
def resize_img(img, input_size):
target_size = input_size
percent_h = float(target_size[1]) / img.size[1]
percent_w = float(target_size[2]) / img.size[0]
percent = min(percent_h, percent_w)
resized_width = int(round(img.size[0] * percent))
resized_height = int(round(img.size[1] * percent))
w_off = (target_size[2] - resized_width) / 2
h_off = (target_size[1] - resized_height) / 2
img = img.resize((resized_width, resized_height), Image.ANTIALIAS)
array = np.ndarray((target_size[1], target_size[2], 3), np.uint8)
array[:, :, 0] = 127
array[:, :, 1] = 127
array[:, :, 2] = 127
ret = Image.fromarray(array)
ret.paste(img, (np.random.randint(0, w_off + 1), int(h_off)))
return ret
# 调节亮度
def random_brightness(img):
prob = np.random.uniform(0, 1)
if prob < train_params['image_distort_strategy']['brightness_prob']:
brightness_delta = train_params['image_distort_strategy']['brightness_delta']
delta = np.random.uniform(-brightness_delta, brightness_delta) + 1
img = ImageEnhance.Brightness(img).enhance(delta)
return img
# 对比度
def random_contrast(img):
prob = np.random.uniform(0, 1)
if prob < train_params['image_distort_strategy']['contrast_prob']:
contrast_delta = train_params['image_distort_strategy']['contrast_delta']
delta = np.random.uniform(-contrast_delta, contrast_delta) + 1
img = ImageEnhance.Contrast(img).enhance(delta)
return img
# 饱和度
def random_saturation(img):
prob = np.random.uniform(0, 1)
if prob < train_params['image_distort_strategy']['saturation_prob']:
saturation_delta = train_params['image_distort_strategy']['saturation_delta']
delta = np.random.uniform(-saturation_delta, saturation_delta) + 1
img = ImageEnhance.Color(img).enhance(delta)
return img
def random_hue(img):
prob = np.random.uniform(0, 1)
if prob < train_params['image_distort_strategy']['hue_prob']:
hue_delta = train_params['image_distort_strategy']['hue_delta']
delta = np.random.uniform(-hue_delta, hue_delta)
img_hsv = np.array(img.convert('HSV'))
img_hsv[:, :, 0] = img_hsv[:, :, 0] + delta
img = Image.fromarray(img_hsv, mode='HSV').convert('RGB')
return img
def distort_image(img):
prob = np.random.uniform(0, 1)
# Apply different distort order
if prob > 0.5:
img = random_brightness(img)
img = random_contrast(img)
img = random_saturation(img)
img = random_hue(img)
else:
img = random_brightness(img)
img = random_saturation(img)
img = random_hue(img)
img = random_contrast(img)
return img
def rotate_image(img):
"""
图像增强,增加随机旋转角度
"""
prob = np.random.uniform(0, 1)
if prob > 0.5:
angle = np.random.randint(-8, 8)
img = img.rotate(angle)
return img
def random_expand(img, keep_ratio=True):
if np.random.uniform(0, 1) < train_params['image_distort_strategy']['expand_prob']:
return img
max_ratio = train_params['image_distort_strategy']['expand_max_ratio']
w, h = img.size
c = 3
ratio_x = random.uniform(1, max_ratio)
if keep_ratio:
ratio_y = ratio_x
else:
ratio_y = random.uniform(1, max_ratio)
oh = int(h * ratio_y)
ow = int(w * ratio_x)
off_x = random.randint(0, ow - w)
off_y = random.randint(0, oh - h)
out_img = np.zeros((oh, ow, c), np.uint8)
for i in range(c):
out_img[:, :, i] = train_params['mean_color']
out_img[off_y: off_y + h, off_x: off_x + w, :] = img
return Image.fromarray(out_img)
def preprocess(img, input_size):
img_width, img_height = img.size
if train_params['apply_distort']:
img = distort_image(img)
img = random_expand(img)
img = rotate_image(img)
# img = resize_img(img, input_size)
# img = img.convert('L')
# img = np.array(img).astype('float32') - train_params['mean_color']
# img *= 0.007843
return img
# reader
def custom_reader(file_list, data_dir, input_size, mode):
def reader():
np.random.shuffle(file_list)
for line in file_list:
# img_name, label
parts = line.split()
image_path = parts[0]
img = Image.open(image_path)
# img = Image.open(os.path.join(data_dir, image_path))
if img.mode != 'RGB':
img = img.convert('RGB')
label = [int(train_params['label_dict'][c]) for c in parts[-1]]
if len(label) == 0:
continue
if mode == 'train':
img = preprocess(img, input_size)
img = resize_img(img, input_size)
img = img.convert('L')
# img.save(image_path)
img = np.array(img).astype('float32') - train_params['mean_color']
# img *= 0.007843
img = img[np.newaxis, ...]
# print("{0} {1}".format(image_path, label))
yield img, label
return reader
def multi_process_custom_reader(file_path, data_dir, num_workers, input_size, mode):
"""
创建多进程reader
:param file_path:
:param data_dir:
:param num_workers:
:param input_size:
:param mode:
:return:
"""
file_path = os.path.join(data_dir, file_path)
readers = []
images = [line.strip() for line in open(file_path)]
n = int(math.ceil(len(images) // num_workers))
image_lists = [images[i: i + n] for i in range(0, len(images), n)]
train_path = os.path.join(train_params['data_dir'], train_params['train_dir'])
for l in image_lists:
reader = paddle.batch(custom_reader(l, train_path, input_size, mode),
batch_size=train_params['train_batch_size'])
readers.append(paddle.reader.shuffle(reader, train_params['train_batch_size']))
return paddle.reader.multiprocess_reader(readers, False) # 返回多进程读取器
# 评估reader
def create_eval_reader(file_path, data_dir, input_size, mode):
file_path = os.path.join(data_dir, file_path)
images = [line.strip() for line in open(file_path)]
eval_path = os.path.join(train_params['data_dir'], train_params['eval_dir'])
return paddle.batch(custom_reader(images, eval_path, input_size, mode),
batch_size=train_params['train_batch_size'])
def optimizer_rms_setting():
batch_size = train_params["train_batch_size"]
iters = train_params["image_count"] // batch_size # 计算总批次
learning_strategy = train_params['rsm_strategy']
lr = learning_strategy['learning_rate']
boundaries = [i * iters for i in learning_strategy["lr_epochs"]]
values = [i * lr for i in learning_strategy["lr_decay"]]
# 均方根传播(RMSProp)法
optimizer = fluid.optimizer.RMSProp(learning_rate=fluid.layers.piecewise_decay(boundaries, values),
regularization=fluid.regularizer.L2Decay(0.00005))
return optimizer
def build_train_program_with_async_reader(main_prog, startup_prog):
"""
定义异步读取器、预测、构建损失函数及优化器
:param main_prog:
:param startup_prog:
:return:
"""
# 将main_prog, startup_prog设置为默认主program, startup_program
with fluid.program_guard(main_prog, startup_prog):
img = fluid.layers.data(name='img', shape=train_params['input_size'], dtype='float32')
gt_label = fluid.layers.data(name='gt_label', shape=[1], dtype='int32', lod_level=1)
# 创建reader
data_reader = fluid.layers.create_py_reader_by_data(capacity=train_params['train_batch_size'],
feed_list=[img, gt_label],
name='train')
# 创建多进程reader
multi_reader = multi_process_custom_reader(train_params['train_list'],
train_params['data_dir'],
train_params['multi_data_reader_count'],
train_params['input_size'],
'train')
data_reader.decorate_paddle_reader(multi_reader)
with fluid.unique_name.guard(): # 更换namespace
img, gt_label = fluid.layers.read_file(data_reader)
model = CRNN(train_params['class_dim'], train_params['label_dict']) # 实例化
fc_out = model.net(img) # 预测
cost = fluid.layers.warpctc(input=fc_out, label=gt_label, blank=train_params['class_dim'],
norm_by_times=True) # 计算CTC损失函数
loss = fluid.layers.reduce_sum(cost) # 损失函数求和
optimizer = optimizer_rms_setting()
optimizer.minimize(loss)
# 执行CTC去重
decoded_out = fluid.layers.ctc_greedy_decoder(input=fc_out,
blank=train_params['class_dim'])
casted_label = fluid.layers.cast(x=gt_label, dtype='int64')
# 计算字符串的编辑距离
# 编辑距离又称Levenshtein距离,由俄罗斯的数学家Vladimir Levenshtein在1965年提出
# 是指利用字符操作,把字符串A转换成字符串B所需要的最少操作数
# 例如:"kitten" -> "sitten" -> "sittin" -> "sitting"
distances, seq_num = fluid.layers.edit_distance(decoded_out, casted_label)
return data_reader, loss, distances, seq_num, decoded_out
def build_eval_program_with_feeder(main_prog, startup_prog, place):
"""
执行评估
:param main_prog:
:param startup_prog:
:param place:
:return:
"""
with fluid.program_guard(main_prog, startup_prog):
img = fluid.layers.data(name='img', shape=train_params['input_size'], dtype='float32')
gt_label = fluid.layers.data(name='gt_label', shape=[1], dtype='int32', lod_level=1)
feeder = fluid.DataFeeder(feed_list=[img, gt_label], place=place, program=main_prog)
reader = create_eval_reader(train_params['eval_list'],
train_params['data_dir'],
train_params['input_size'],
'eval')
with fluid.unique_name.guard():
model = CRNN(train_params['class_dim'], train_params['label_dict'])
outputs = model.net(img)
return feeder, reader, outputs, gt_label
def load_pretrained_params(exe, program):
# 如果设置了增量训练,则加载之前训练的模型
if train_params['continue_train'] and os.path.exists(train_params['save_model_dir']):
logger.info('load param from retrain model')
fluid.io.load_persistables(executor=exe,
dirname=train_params['save_model_dir'],
main_program=program)
# 如果设置了预训练,则加载预训练模型
elif train_params['pretrained'] and os.path.exists(train_params['pretrained_model_dir']):
logger.info('load param from pretrained model')
def if_exist(var):
return os.path.exists(os.path.join(train_params['pretrained_model_dir'], var.name))
fluid.io.load_vars(exe, train_params['pretrained_model_dir'], main_program=program,
predicate=if_exist)
def train():
"""
训练
:return:
"""
init_log_config()
init_train_params()
logger.info("start train crnn, train params:%s", str(train_params))
logger.info("create place, use gpu:" + str(train_params['use_gpu']))
place = fluid.CUDAPlace(0) if train_params['use_gpu'] else fluid.CPUPlace()
logger.info("build network and program")
train_program = fluid.Program()
start_program = fluid.Program()
eval_program = fluid.Program()
# start_program = fluid.Program() # wdb del 20200322
# 定义异步读取器、预测、构建损失函数及优化器
train_reader, loss, distances, seq_num, decoded_out = \
build_train_program_with_async_reader(train_program, start_program)
# 评估
eval_feeder, eval_reader, output, gt_label = \
build_eval_program_with_feeder(eval_program, start_program, place)
eval_program = eval_program.clone(for_test=True)
logger.info("build executor and init params")
exe = fluid.Executor(place)
exe.run(start_program)
train_fetch_list = [loss.name, distances.name, seq_num.name, decoded_out.name]
eval_fetch_list = [output.name]
load_pretrained_params(exe, train_program)
stop_strategy = train_params['early_stop']
successive_limit = stop_strategy['successive_limit']
sample_freq = stop_strategy['sample_frequency']
min_instance_error = stop_strategy['min_instance_error']
stop_train = False
successive_count = 0
total_batch_count = 0
distance_evaluator = fluid.metrics.EditDistance("edit-distance")
# 执行训练
for pass_id in range(train_params["num_epochs"]):
logger.info("current pass: %d, start read image", pass_id)
batch_id = 0
train_reader.start() # 启动reader线程
distance_evaluator.reset()
try:
while True:
t1 = time.time()
loss, distances, seq_num, decoded_out = exe.run(train_program,
fetch_list=train_fetch_list,
return_numpy=False)
distances = np.array(distances)
seq_num = np.array(seq_num)
distance_evaluator.update(distances, seq_num)
period = time.time() - t1
loss = np.mean(np.array(loss))
batch_id += 1
total_batch_count += 1
if batch_id % 10 == 0:
distance, instance_error = distance_evaluator.eval()
# logger.info(np.array(decoded_out))
logger.info("Pass {0}, trainbatch {1}, loss {2} distance {3} instance error {4} time {5}"
.format(pass_id, batch_id, loss, distance, instance_error, "%2.2f sec" % period))
# 采用简单的定时采样停止办法,可以调整为更精细的保存策略
if total_batch_count % 100 == 0:
logger.info("temp save {0} batch train result".format(total_batch_count))
fluid.io.save_persistables(dirname=train_params['save_model_dir'],
main_program=train_program,
executor=exe)
if total_batch_count % sample_freq == 0:
if instance_error <= min_instance_error:
successive_count += 1
logger.info("instance error {0} successive count {1}".format(instance_error, successive_count))
if successive_count >= successive_limit:
stop_train = True
break
else:
successive_count = 0
except fluid.core.EOFException:
train_reader.reset()
distance, instance_error = distance_evaluator.eval()
logger.info("Pass {0} distance {1} instance error {2}".format(pass_id, distance, instance_error))
if stop_train:
logger.info("early stop")
break
logger.info("training till last, end training")
fluid.io.save_persistables(dirname=train_params['save_model_dir'], main_program=train_program, executor=exe)
if __name__ == '__main__':
train()
将模型转化为固化的模型
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import six
import numpy as np
import random
import time
import codecs
import sys
import functools
import math
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
from paddle.fluid.param_attr import ParamAttr
from PIL import Image, ImageEnhance
# 读取 label_list.txt 文件获取类别数量
class_dim = -1
all_file_dir = "data/data6927/word-recognition"
with codecs.open(os.path.join(all_file_dir, "label_list.txt")) as label_list:
class_dim = len(label_list.readlines())
target_size = [1, 48, 512]
mean_rgb = 127.5
save_freeze_dir = "./crnn-model"
class CRNN(object):
def __init__(self, num_classes, label_dict):
self.outputs = None
self.label_dict = label_dict
self.num_classes = num_classes
def name(self):
return 'crnn'
def conv_bn_pool(self, input, group, out_ch, act="relu", param=None, bias=None, param_0=None, is_test=False, pooling=True, use_cudnn=False):
tmp = input
for i in six.moves.xrange(group):
tmp = fluid.layers.conv2d(
input=tmp,
num_filters=out_ch[i],
filter_size=3,
padding=1,
param_attr=param if param_0 is None else param_0,
act=None, # LinearActivation
use_cudnn=use_cudnn)
tmp = fluid.layers.batch_norm(
input=tmp,
act=act,
param_attr=param,
bias_attr=bias,
is_test=is_test)
if pooling:
tmp = fluid.layers.pool2d(
input=tmp,
pool_size=2,
pool_type='max',
pool_stride=2,
use_cudnn=use_cudnn,
ceil_mode=True)
return tmp
def ocr_convs(self, input, regularizer=None, gradient_clip=None, is_test=False, use_cudnn=False):
b = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.0))
w0 = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.0005))
w1 = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.01))
tmp = input
tmp = self.conv_bn_pool(
tmp,
2, [16, 16],
param=w1,
bias=b,
param_0=w0,
is_test=is_test,
use_cudnn=use_cudnn)
tmp = self.conv_bn_pool(
tmp,
2, [32, 32],
param=w1,
bias=b,
is_test=is_test,
use_cudnn=use_cudnn)
tmp = self.conv_bn_pool(
tmp,
2, [64, 64],
param=w1,
bias=b,
is_test=is_test,
use_cudnn=use_cudnn)
tmp = self.conv_bn_pool(
tmp,
2, [128, 128],
param=w1,
bias=b,
is_test=is_test,
pooling=False,
use_cudnn=use_cudnn)
return tmp
def net(self, images, rnn_hidden_size=200, regularizer=None,
gradient_clip=None, is_test=False, use_cudnn=True):
conv_features = self.ocr_convs(
images,
regularizer=regularizer,
gradient_clip=gradient_clip,
is_test=is_test,
use_cudnn=use_cudnn)
sliced_feature = fluid.layers.im2sequence(
input=conv_features,
stride=[1, 1],
filter_size=[conv_features.shape[2], 1])
para_attr = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
bias_attr = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
bias_attr_nobias = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
fc_1 = fluid.layers.fc(input=sliced_feature,
size=rnn_hidden_size * 3,
param_attr=para_attr,
bias_attr=bias_attr_nobias)
fc_2 = fluid.layers.fc(input=sliced_feature,
size=rnn_hidden_size * 3,
param_attr=para_attr,
bias_attr=bias_attr_nobias)
gru_forward = fluid.layers.dynamic_gru(
input=fc_1,
size=rnn_hidden_size,
param_attr=para_attr,
bias_attr=bias_attr,
candidate_activation='relu')
gru_backward = fluid.layers.dynamic_gru(
input=fc_2,
size=rnn_hidden_size,
is_reverse=True,
param_attr=para_attr,
bias_attr=bias_attr,
candidate_activation='relu')
w_attr = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.02))
b_attr = fluid.ParamAttr(
regularizer=regularizer,
gradient_clip=gradient_clip,
initializer=fluid.initializer.Normal(0.0, 0.0))
fc_out = fluid.layers.fc(input=[gru_forward, gru_backward],
size=self.num_classes + 1,
param_attr=w_attr,
bias_attr=b_attr)
self.outputs = fc_out
return fc_out
def get_loss(self, label):
cost = fluid.layers.warpctc(input=self.outputs, label=label, blank=self.num_classes, norm_by_times=True)
sum_cost = fluid.layers.reduce_sum(cost)
return sum_cost
def get_infer(self):
return fluid.layers.ctc_greedy_decoder(input=self.outputs, blank=self.num_classes)
def freeze_model():
exe = fluid.Executor(fluid.CPUPlace())
image = fluid.layers.data(name='image', shape=target_size, dtype='float32')
model = CRNN(class_dim, {})
pred = model.net(image)
out = model.get_infer()
freeze_program = fluid.default_main_program()
fluid.io.load_persistables(exe, save_freeze_dir, freeze_program)
freeze_program = freeze_program.clone(for_test=True)
fluid.io.save_inference_model("./freeze-model", ['image'], out, exe, freeze_program)
if __name__ == '__main__':
freeze_model()
print("保存模型成功.")
在验证集合上测试单词的准确率
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import numpy as np
import random
import time
import codecs
import sys
import functools
import math
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
from paddle.fluid.param_attr import ParamAttr
from PIL import Image, ImageEnhance
import matplotlib.pyplot as plt
target_size = [1, 48, 512]
mean_rgb = 127.5
data_dir = 'data/data6927/word-recognition'
eval_file = "eval.txt"
label_list = "label_list.txt"
use_gpu = True
label_dict = {}
save_freeze_dir = "./freeze-model"
place = fluid.CPUPlace()
exe = fluid.Executor(place)
# 加载模型
[inference_program, feed_target_names, fetch_targets] = \
fluid.io.load_inference_model(dirname=save_freeze_dir, executor=exe)
# print(fetch_targets)
def init_eval_parameters():
"""
初始化训练参数,主要是初始化图片数量,类别数
:return:
"""
label_list_path = os.path.join(data_dir, label_list)
index = 0
# 读取样本文件内容,并存入字典
with codecs.open(label_list_path, encoding='utf-8') as flist:
lines = [line.strip() for line in flist]
for line in lines:
parts = line.split()
label_dict[int(parts[1])] = parts[0]
def resize_img(img):
"""
重设图像大小
:param img:
:return:
"""
percent_h = float(target_size[1]) / img.size[1]
percent_w = float(target_size[2]) / img.size[0]
percent = min(percent_h, percent_w)
resized_width = int(round(img.size[0] * percent))
resized_height = int(round(img.size[1] * percent))
w_off = (target_size[2] - resized_width) / 2
h_off = (target_size[1] - resized_height) / 2
img = img.resize((resized_width, resized_height), Image.ANTIALIAS)
array = np.ndarray((target_size[1], target_size[2], 3), np.uint8)
array[:, :, 0] = 127
array[:, :, 1] = 127
array[:, :, 2] = 127
ret = Image.fromarray(array)
ret.paste(img, (np.random.randint(0, w_off + 1), int(h_off)))
return ret
def read_image(img_path):
"""
读取图像
:param img_path:
:return:
"""
img = Image.open(img_path)
if img.mode != 'RGB':
img = img.convert('RGB')
img = resize_img(img)
img = img.convert('L') # 返回一个转换后的副本,L模式进行转换
img = np.array(img).astype('float32') - mean_rgb
img = img[..., np.newaxis]
img = img.transpose((2, 0, 1))
img = img[np.newaxis, :]
return img
def infer(image_path):
"""
执行预测
:param image_path:
:return:
"""
tensor_img = read_image(image_path)
label = exe.run(inference_program,
feed={feed_target_names[0]: tensor_img},
fetch_list=fetch_targets,
return_numpy=False)
label = np.array(label[0])
ret = ""
if label[0] != -1:
ret = ret.join([label_dict[int(c[0])] for c in label])
return ret
def eval_all():
"""
评估所有
:return:
"""
eval_file_path = os.path.join(data_dir, eval_file) # 评估文件路径
total_count = 0
right_count = 0
"""
with codecs.open(eval_file_path, encoding='utf-8') as flist:
lines = [line.strip() for line in flist]
t1 = time.time()
random.shuffle(lines) # 打乱样本
i = 0
for line in lines:
i += 1
if i > 3:
break
total_count += 1
parts = line.strip().split()
result = infer(parts[0]) # 执行推测
img = Image.open(parts[0])
plt.imshow(img)
plt.show()
print("infer result:{0} answer:{1}".format(result, parts[1]))
if str(result) == parts[1]:
right_count += 1
period = time.time() - t1
print("count:{0} time:{1} accuracy:{2}".format(total_count,
"%2.2f sec" % period, right_count / total_count))
"""
#测试自定义图片
img_file ="4.jpg"
result = infer(img_file) # 执行推测
print("infer result:{0}".format(result))
img = Image.open(img_file)
plt.imshow(img)
plt.show()
if __name__ == '__main__':
init_eval_parameters()
eval_all()
人脸检测
!pip install paddlehub==2.2.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
第三方模型测试
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import paddlehub as hub
test_img_path = ["./zhouyu.png", "./yzd.png", "./youhua.png", "./nezha.png"]
img = mpimg.imread(test_img_path[0])
# 展示待预测图片
# plt.figure(figsize=(10, 10))
# plt.imshow(img)
# plt.axis('off')
# plt.show()
# 读取待预测图片文件
with open('test.txt', 'r') as f:
test_img_path=[]
for line in f:
test_img_path.append(line.strip())
print(test_img_path)
# 加载模型
#在预测时会将图片输入缩放为640 * 480
module = hub.Module(name="ultra_light_fast_generic_face_detector_1mb_640")
input_dict = {"image": test_img_path}
results = module.face_detection(data=input_dict, visualization=True)
for result in results:
print(result)
# 预测结果展示
for img_name in test_img_path:
img = mpimg.imread("face_detector_640_predict_output/" + img_name)
plt.figure(figsize=(10,10))
plt.imshow(img)
plt.axis('off')
plt.show()
文字识别
# 安装需要的软件包
!pip install paddlehub==1.6.2
!hub install chinese_ocr_db_crnn_server==1.0.0
!pip install shapely
!pip install pyclipper
第三方模型测试
import paddlehub as hub
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
img_list = ['ticket.jpg']
ocr = hub.Module(name="chinese_ocr_db_crnn_server")
# result = ocr.recognize_text(images=[cv2.imread('./book1.png')])
result = ocr.recognize_text(paths=img_list) # 这种写法也可以
print(result)
print("\n========================= 开始解析结果 ==========================\n")
result_dict = dict(result[0])
rects = []
words = result_dict["data"]
for word in words: # word为字典,result
print(word["text"])
print(word["confidence"])
points = word["text_box_position"]
# print(points)
rects.append(points) # 添加到列表
# 在原图上绘制矩形
im = cv2.imread(img_list[0], 1)
for rect in rects:
p1 = (rect[0][0], rect[0][53])
p2 = (rect[1][0], rect[1][54])
p3 = (rect[2][0], rect[2][55])
p4 = (rect[3][0], rect[3][56])
cv2.line(im, p1, p2, (0, 0, 255), 2)
cv2.line(im, p2, p3, (0, 0, 255), 2)
cv2.line(im, p3, p4, (0, 0, 255), 2)
cv2.line(im, p4, p1, (0, 0, 255), 2)
cv2.imwrite("ticket_result.jpg", im)
利用PaddleHub实现人脸检测
安装库命令
!pip install paddlehub==2.2.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
Paddle代码
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import paddlehub as hub
test_img_path = ["./zhouyu.png", "./yzd.png", "./youhua.png", "./nezha.png"]
img = mpimg.imread(test_img_path[0])
# 展示待预测图片
# plt.figure(figsize=(10, 10))
# plt.imshow(img)
# plt.axis('off')
# plt.show()
# 读取待预测图片文件
with open('test.txt', 'r') as f:
test_img_path=[]
for line in f:
test_img_path.append(line.strip())
print(test_img_path)
# 加载模型
#在预测时会将图片输入缩放为640 * 480
module = hub.Module(name="ultra_light_fast_generic_face_detector_1mb_640")
input_dict = {"image": test_img_path}
results = module.face_detection(data=input_dict, visualization=True)
for result in results:
print(result)
# 预测结果展示
for img_name in test_img_path:
img = mpimg.imread("face_detector_640_predict_output/" + img_name)
plt.figure(figsize=(10,10))
plt.imshow(img)
plt.axis('off')
plt.show()
0 条评论