医疗机器人:语音系统

平台

  Windows/Ubuntu系统
  科大讯飞开发套件iFLYOSMT8516开发套件
  讯飞开发平台
  python

开发套件iFLYOS MT8516

  MT8516开发套件是一款基于MTK平台MT8516处理器和科大讯飞环形 6麦阵列结构的语音整体解决方案。此外还提供红外、串口、ZigBee等多种接口,小巧易扩展,搭载iFLYOS生态,提供海量内容服务与定制化接口,满足多种远、近场语音交互场景。
MT8516整机图

硬件系统

硬件构成和产品

硬件构成和产品清单
产品说明
  整个开发套件分为三部分,最上面一层为功能按键和LED灯板,主要提供MIC禁音键、播放/暂停键、音量键及LED灯效。中间一层为核心开发板,主要包括一个MT8516A芯片和环形6颗麦克风阵列。最下面一层主要包括一个扬声器、配网按键和电源接口(Type-C)。

核心板——MT8516A

  MT8516是联发科技在20175月推出的一款面向智能语音助手设备和智能音响的系统单芯片。支持多达8TDM通道和2PDM输入,以支持来自多个源的音频输入,适用于远场麦克风语音控制和智能音响设备。
MT8516芯片
芯片信息
  其系统设计框图为:
系统设计框图
  蓝色部分为按键和LED灯模块。绿色部分为外设模块,主要包括6个麦克风和1个扬声器。其中AMIC为麦克风,4Ω5W为扬声器。黄色部分为电源模块。
  最下角的2个模块分别为外置128/256MB DDR3和外置128/256MB FLASH内存。

麦克风——环形6颗麦克风阵列

环形麦克风阵列
  产品特性:

  1. 远/近场拾音
      方案前端采用科大讯飞6麦克风阵列,能够实现家居场景5-10m左右的用户指令音频拾取,通过新一代神 经网络降噪算法对拾取音频进行处理,提供高品质降噪后音频给到后端,以保障唤醒、识别效果。
  2. 噪声抑制
      该功能基于科大讯飞6麦克风阵列中的声源定位和波束增强等算法特性,通过采集指令声源(控制设备 的发音人)所在波束范围内的声音,抑制或者不处理其它波束所接收的声音,以提升采集声音的效果,为后续唤醒和命令词识别效果提供保障。
  3. 回声消除
      支持用户交互过程中,实现一次唤醒,多轮交互的控制方式,即用户可以中断播报进程进行下一轮交互,让交互更加自然,流畅。

软件系统

快速使用

  1. 下载小飞在线APP并登录,下载地址
  2. 长按设备配网键5秒,听到语音和黄色呼吸灯提示后松手,进入配网模式。
  3. 在小飞在线APP点击添加设备,点击连接。(需要打开蓝牙、网络和定位)
    连接设备
  4. 输入网络账号密码,等待设备联网即可。
  5. 完成设备联网后,可以说蓝小飞,***进行语音交互。

二次开发

环境搭建

  下载MTK8516交叉编译工具链
  解压缩工具链,如果系统没有7z,请先安装,安装方式:

1
sudo apt-get install p7zip

  解压:

1
7z x yocto_toolchain_7.3.7z

  安装:

1
chmod +x oecore-x86_64-aarch64-toolchain-nodistro.0.sh

1
./oecore-x86_64-aarch64-toolchain-nodistro.0.sh

  推荐安装目录为:/etc/iflyosBoards/mtk8516,等待安装完成。
  设置环境变量:

1
source /etc/iflyosBoards/mtk8516/environment-setup-aarch64-poky-linux

  安装第三方库:地址
  复制到安装目录下的sysroot目录:

1
mv /***/lib/*  /etc/iflyosBoards/mtk8516/sysroots/aarch64-poky-linux/usr/lib/

1
mv /***/lib64/*  /etc/iflyosBoards/mtk8516/sysroots/aarch64-poky-linux/usr/lib64/

下载开源工程项目

  MTK8516开发套件已经内置了工程项目,可以不用下载,但如果想在电脑上看看也可以下载到本地。
  解压后目录如下:

1
2
3
build_mt8516.sh
jsruntime
smartSpeakerApp

  编译

1
source build_mt8516.sh

  目标文件目录: /etc/iflyosBoards/mtk8516-edu/install

1
2
3
4
5
6
7
/etc/iflyosBoards/mt8516-edu/install
├── bin
│ └── iotjs #jsruntime可执行程序
├── iFLYOS.json #主程序启动配置文件
├── lib
│ └── libiotjs.so #jsruntime库
├── smartSpeakerApp #js应用层文件目录

部署

  MTK8516开发套件本质上是一个Liunx系统,可以使用adb工具对其进行简单的调试。
  adb工具即Android Debug Bridge(安卓调试桥)tools。它就是一个命令行窗口,用于通过电脑端与模拟器或者真实设备交互。
  连接至电脑端并输入adb shell即可进入该设备。执行简单的cdls命令即可查看里面的文件。
  adb工具有2个比较重要的文件传输命令,即adb push sourcefile targetfileadb pull sourcefile targetfile,一个是将本地文件发送至设备,一个是将设备文件拉取至本地。
  例如将设备里面的配置文件信息拉取至本地:

1
adb pull /data/iflyos/iFLYOS.json <保存本地路径>

  然后打开文件即可。
配置文件说明
  通常,可以先将需要更改的文件先拉取至本地,然后再发送至设备即可。

自定义唤醒词

  1. 生成唤醒词资源
      登录唤醒词平台,输入唤醒词并下载,引擎版本为1566
  2. 修改唤醒词资源
      将生成好的唤醒词,push/usr/share/iflyos/ivw/目录下,并在/data/iflyos/iFLYOS.json里修改caeResPath的对应资源路径。
      替换完成后,执行adb shell sync生效。
      断点重开即可查看效果。

更换提示音

  在TTS合成页面选择对应的发音人,生成专属开机欢迎语,推送至设备即可替换。
  注:

  1. 设备的音频文件位置为:/usr/share/iflyos/ring
  2. 完成替换后需执行adb shell sync命令生效
  3. 注意音频文件需转换为m4a格式
  4. 替换ring目录下的音频后,需保持音频文件名前后一致

云端操作系统——iFLYOS

  iFLYOS MT8516套件是通过EVS协议与iFLYOS服务端通信实现语音交互和技能调用的。MTK8516套件基于iotjs开发平台实现了与硬件、网络与文件系统等进行交互的能力,并使用JS语言完成了EVS协议逻辑。
  iFLYOS是科大讯飞基于人机智能交互技术为广大开发者和第三方厂商开放的全新语音技术服务操作系统。通过先进的人工智能技术让用户的日常生活更方便,获取信息的方式更快捷。
iFLYOS架构
  嵌入式协议(Embedded iFLYOS Voice Service, 简称EVS)是一个相对IVS更简单的协议,为厂商接入提供方便,降低设备运行要求,本协议采取websocket进行通讯。
  iotjs是三星开源的javascript物联网开发平台。它为javascript应用程序提供了访问硬件、网络、文件系统和异步化的能力,功能类似于nodejs,但无论是代码体积还是内存需求,iotjs都要小很多,是用javascript开发iot(Internet of Things)设备应用程序的首选。

创建设备

  打开并登录科大讯飞iFLYOS系统,选择设备接入添加产品。这里选择linux平台和EVS网络协议。
  注:这里的产品信息为自定义名字。

新增技能

  选择云端配置->设备能力,找到更多能力,将持续交互能力儿童模式打开即可。

发布到设备

  将MTK8516开发套件的配置文件(/data/iflyos/iFLYOS.json)拉取到本地,将里面的cliend_idcliend_secret更改为新建设备的信息。并发送至开发套件。
  执行adb shell sync并断电重开。
  重新进入配网模式,打开手机小飞在线APP,重新配置,并打开里面的持续交互能力儿童模式功能。
  注:第一次发布可能会出现需要授权的问题。选择产品发布->设备ID,找到鉴权失败设备,然后将设备导入授权即可。

自定义问答

  选择云端配置->语义理解找到自定义问答,再点击创建自定义问答,然后新建问答库。其模板格式如下:
自定义问答模板说明
  该技能不需要发布到设备,联网即可。

讯飞开发平台

  第一部分介绍的是MTK8516开发套件的使用,但其更多的是用于娱乐型的交互,如果需要任务型的交互,还需要使用科大讯飞的开发平台。
  打开讯飞开发平台,点击产品服务,可以看到讯飞开发平台提供了很多易操作的服务功能。

创建控制台

  点击控制台,创建新应用。

实时语音转写

  选择语音识别->实时语音转写,找到实时语音转写API,点击并打开文档。
  实时语音转写(Real-time ASR)基于深度全序列卷积神经网络框架,通过WebSocket协议,建立应用与语言转写核心引擎的长连接,开发者可实现将连续的音频流内容,实时识别返回对应的文字流内容。
  官方文档已经解释的很清楚了,这里就不多做介绍了。在接口demo里面可以下载官方代码。

AMR小车的语音控制

程序逻辑

  整体逻辑比较简单,即语音输入控制指令->小车执行指令->显示任务信息,二者都是基于web实现通讯。

语音代码逻辑

  通过阅读官方文档可以看出,实时语音转写主要包含二个阶段:握手阶段和实时通讯阶段。

环境搭建

  本博客采用的python语言编写,需要的库为:websocket==0.2.1websocket-client==0.56.0requests等。

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# -*- encoding:utf-8 -*-
# @copyright: Cxx

import sys, time, threading

# 握手阶段
import hashlib
import base64
import hmac

import websocket
from websocket import create_connection
from urllib.parse import quote

# 实时通讯
import json

import pyaudio
import datetime
from datetime import datetime

#import logging
#logging.basicConfig()

class YunYinModule(object):
def __init__(self, base_url, app_id, api_key):
self.base_url = base_url
self.app_id = app_id
self.api_key = api_key

self.end_tag = "{\"end\": true}"

self.isSendByMicro_flag = True
self.result_data = ""

  创建YunYinModule类,语音实时转写需要的设备参数为base_url,app_idapi_keyresult_data存放的是处理的数据。

握手阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def createSigna(self):
# 对baseString进行MD5编码
self.ts = str(int(time.time()))
baseString = (self.app_id + self.ts).encode('utf-8')

md5 = hashlib.md5() # MD5
md5.update(baseString)
baseString = md5.hexdigest()
baseString = bytes(baseString, encoding='utf-8')

# 进行HmacSHA1加密
apiKey = self.api_key.encode('utf-8')
signa = hmac.new(apiKey, baseString, hashlib.sha1).digest()
signa = base64.b64encode(signa)
signa = str(signa, 'utf-8')

return signa

def handShake(self):
"""
握手阶段
"""
signa = self.createSigna()
self.woShou = create_connection(self.base_url + "?appid=" + self.app_id +
"&ts=" + self.ts + "&signa=" + quote(signa))

#result = str(self.woShou.recv())
#print(result)

  所谓握手就是网络通讯连接阶段。主要是生成一个signa。这里官方文档解释的很清楚,可自行学习。

实时通讯阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def receiveResult(self):
"""
数据接收阶段
"""
try:
while (self.woShou.connect and self.isSendByMicro_flag):
#while (self.isSendByMicro_flag):
result = str(self.woShou.recv())

result_dict = json.loads(result)

result_final_data = ""

if len(result) == 0:
print("receive result end.")
break

else:
# 解析结果
if result_dict["action"] == "started":
print("handShake Successfully")

if result_dict["action"] == "result":
result_json = json.loads(result_dict["data"])

# we为0表示是中间结果
isEndFlag = bool(result_json["cn"]["st"]["rt"][0]["ws"][0]["we"])

if isEndFlag is True:
result_raw_data = result_json["cn"]["st"]["rt"]
#result_final_data = self.dealWithResultData(result_raw_data)
self.dealWithResultData(result_raw_data)

return self.result_data

except websocket.WebSocketConnectionClosedException:
print("Error. Receive Result End.")

def dealWithResultData(self, data):
"""
处理数据阶段
"""
self.result_data = "" # 清空字符串
for i in data:
for x in i["ws"]:
w = x["cw"][0]["w"]
wp = x["cw"][0]["wp"]
if wp != "s":
self.result_data += w

#print("Final Result is", datetime.now(), "------>", self.result_data)

if self.result_data.find("完毕") != -1:
self.isSendByMicro_flag = False

  实时通讯阶段也就是一个数据处理的阶段,WebSocket协议返回的是一个json格式的数据,也就是代码中的result变量,通过解析该变量得到可处理的result_dict数据。
  result_dict字典主要包括actiondata等部分,其中当actionstarted时,表示数据开始接收,当actionresult时,表示已经接收到数据,并存放在data中。result_json表示的是实际接收到的语音数据。
语音转写data字段说明
  通过分析该表格可知,当we不为0时,表示的是完整的句子(这里完整的意思是,在接收数据时,并不是说完一句完整的话才会显示结果,而是不间断的实时的输出结果,比如你说我要去南京,实际显示的是我要去我要去南京等多个结果,而我们这里只需要最终的结果即可)。
  因此在这里添加了一个判断,确保是最后一个完整的句子,然后将原来打散的字符拼接在一起即可。(实际数据输出时,是按单词一个个打散输出的,可print调试观察)
  最后加了一个如果输入完毕,则停止交互。

麦克风输入

1
2
3
4
5
6
7
8
9
10
11
12
def sendByMicro(self):
rate = 16000 # 采样率
format = pyaudio.paInt16 # 采样位数
channels = 1 # 通道数 单声道
chunks = 1280 # 一个缓冲区存放的音频数
p = pyaudio.PyAudio()
stream = p.open(rate=rate, format=format, channels=channels, frames_per_buffer=chunks, input=True)
# Input:输入流 本例是录制音频流

while self.isSendByMicro_flag:
data = stream.read(chunks)
self.woShou.send(data)

  按照官方文档说明,建议音频流每40ms发送1280字节,采样率为16K。这里需要while循环持续接收语音输入。

AMR小车代码逻辑

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#encoding:utf-8

#CopyRight@Cxx

import json
import requests
import time


class MedicalRobotOrder(object):
def __init__(self, ServerIP, MapID):
self.ServerIP = ServerIP
self.MapID = MapID
self.isTASK_RECEIVED = False
self.isTASK_FINISHED = False

self.initParam()

def initParam(self):
self.urlAddOrder = "https://" + self.ServerIP + ":8081/order/addOrder"
self.urlOnlineAmr = "https://" + self.ServerIP + ":8081/amr/onlineAmr"

  这里主要涉及到swagger-ui的2个接口,即addOrderonlineAmr,需要的参数为ServerIPMapID

addOrder接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def addOrder(self, amrID="zerg011", targetPoint="P2", times=1):
"""
下达任务函数,到达目标点多少次,单目标点
参数:
targetPoint:目标点
times:次数,默认为1
返回:
True:下达任务成功
False:下达任务失败
"""
headers = {
"Content-Type": "application/json; charset=UTF-8",
"Referer": self.urlAddOrder,
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36",
}

pyload = {
"mapId": self.MapID,
"amrId": amrID,
"tasks": [
#{
# "target":"P1",
# "actions": [1]
#},
{
"target":targetPoint,
"actions": [1]
},
]
}

for a in range(0,times):
try:
response = requests.post(self.urlAddOrder, data=json.dumps(pyload), headers=headers, timeout=3).text

if eval(response)["message"] == "success":
return True
else:
return False

except requests.exceptions.Timeout:
print("连接超时,请检查网络")

  该接口是通过web进行通讯,通过requests.pots进行接口调用。这里的heards不需要更改,target参数表示的是地图中的点名称。action表示的是任务类型。其中1指的是休息2指是充电4指的是上料5指的是下料6指的是拣货

onlineAmr接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def checkTaskState(self, amrID):
"""
查看机器人TaskState状态
"""

while(True):
robotState = self.checkRobotState(amrID)
#print("robotState: ", robotState)
#time.sleep(1)
if (robotState == "TASK_RECEIVED") or (self.isTASK_RECEIVED is True):
self.isTASK_RECEIVED = True

messageList = requests.get(self.urlOnlineAmr).json()
amrList = messageList['amrList']
taskState = amrList[0]['taskState']

if taskState == "TASK_FINISHED":
print("taskState:", taskState)
break

def checkRobotState(self, amrID):
"""
查看机器人RobotState状态
"""

messageList = requests.get(self.urlOnlineAmr).json()
amrList = messageList['amrList']
for i in range(len(amrList)):
if amrList[i]['robotId'] == amrID:
robotState = amrList[i]['robotState']

return robotState

  该接口查看的是当前调度信息中所有小车的信息。其参数如图所示:
onlineAmr接口

总代码逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from mymoving import MedicalRobotOrder
from myYuYin import YunYinModule

import yaml
import threading


class MyMedicalRobot(MedicalRobotOrder, YunYinModule):
def __init__(self, ServerIP, MapID, app_id, api_key, base_url):
MedicalRobotOrder.__init__(self, ServerIP, MapID)
YunYinModule.__init__(self, base_url, app_id, api_key)

self.placeToPoint = {"检验科":"P2"}
self.isAddOrder = False
yuYinData = ""

def test(self):
self.handShake()

thread_send = threading.Thread(target=self.sendByMicro)
thread_send.start()

while True:
yuYinData = self.receiveResult()

#print("接收到的语音信息为:", yuYinData)

if yuYinData.find("完毕") != -1:
print("接收完毕,停止发送!")
break

else:
print("接收到的语音信息为:", yuYinData)
if yuYinData.find("检验科") != -1:
print("现在去检验科。")

targetPoint = self.placeToPoint["检验科"]
myAMRID = "zerg009"

self.isAddOrder = self.addOrder(amrID=myAMRID, targetPoint=targetPoint)
if self.isAddOrder:
print("Add Order Successfully", "TargetPoint is: ", targetPoint)
checkTaskState(myAmrID)
else:
print("Add Order Failed")
self.isSendByMicro_flag = False
break

  这里的参数文件为:

1
2
3
4
5
6
ServerIP : "192.168.3.18"
MapID : "213xll730"

app_id : "5f1a39b3"
api_key : "d66517a308ba753ebe879506cb95f0c7"
base_url : "wss://rtasr.xfyun.cn/v1/ws"

  主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if __name__ == "__main__":
with open("param.yml") as stream:
robotParam = yaml.load(stream, Loader=yaml.FullLoader)
# python3.6 可能需要去掉 Loader=yaml.FullLoader

ServerIP = robotParam['ServerIP']
MapID = robotParam['MapID']
app_id = robotParam['app_id']
api_key = robotParam['api_key']
base_url = robotParam['base_url']

myMedicalRobot = MyMedicalRobot(ServerIP, MapID, app_id, api_key, base_url)

myMedicalRobot.test()

  首先通过解析参数文件获得参数,然后创建MyMedicalRobot类对象。
  由于用户需要不断的输入语音信息,因此需要单独给语音输入开启一个线程,保证能实时获取语音信息。然后通过receiveResult()得到用户输入的语音信息yuYinData,并通过解析语音信息的关键字转换为地图中的信息,最后将该信息发送给小车去执行。

谢谢老板!
-------------本文结束感谢您的阅读给个五星好评吧~~-------------