・demo_chat1.pyは初期状態のべゼリーが自動起動するプログラムのうちのひとつで、音声体を行います。
モジュールの読み込み
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
#!/usr/bin/python # -*- coding: utf-8 -*- # 音声対話デモ # for Bezelie Edgar # for Raspberry Pi # by Jun Toyoda (Team Bezelie) # from Aug15th2017 from datetime import datetime # 現在時刻取得 from random import randint # 乱数の発生 from time import sleep # ウェイト処理 import subprocess # 外部プロセスを実行するモジュール import threading # マルチスレッド処理 import socket # ソケット通信モジュール import select # I/O処理完了待機モジュール import json # jsonファイルを扱うモジュール import csv # CSVファイルを扱うモジュール import sys # python終了sys.exit()のために必要 import re # 正規表現モジュール import bezelie # べゼリー専用サーボ制御モジュール |
・threadingモジュールはアラームの時間処理に使います。
・reは正引き表現を使うためのモジュールです。
初期設定
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
csvFile = "/home/pi/bezelie/chatDialog.csv" # 対話リスト jsonFile = "/home/pi/bezelie/edgar/data_chat.json" # 設定ファイル ttsFile = "/home/pi/bezelie/edgar/exec_openJTalk.sh" # 音声合成 debugFile = "/home/pi/bezelie/debug.txt" # debug用 # 設定ファイルの読み込み f = open (jsonFile,'r') jDict = json.load(f) name = jDict['data0'][0]['name'] # べゼリーの別名。 user = jDict['data0'][0]['user'] # ユーザーのニックネーム。 vol = jDict['data0'][0]['vol'] # スピーカー音量。 subprocess.call('amixer cset numid=1 '+vol+'% -q', shell=True) # スピーカー音量 # mic = jDict['data0'][0]['mic'] # マイク感度。 mic = "60" # マイク感度。 subprocess.call('sudo amixer sset Mic '+mic+' -c 0 -q', shell=True) # マイク感度設定 # 変数の初期化 muteTime = 1 # 音声入力を無視する時間 bufferSize = 256 # 受信するデータの最大バイト。2の倍数が望ましい。 |
・data_chat.jsonを読み込み、ユーザー名、ロボット名、スピーカーボリュームなどを読み込んでいます。
・マイク感度を読み込む処理がコメントアウトされているのは、キット付属のマイクに感度調整機能が無いからです。
関数定義
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# 関数 def timeCheck(): # 活動時間内かどうかのチェック f = open (jsonFile,'r') jDict = json.load(f) awake1Start = jDict['data1'][0]['awake1Start'] awake1End = jDict['data1'][0]['awake1End'] awake2Start = jDict['data1'][0]['awake2Start'] awake2End = jDict['data1'][0]['awake2End'] t = datetime.now() if int(t.hour) > int(awake1Start[0:2]) and int(t.hour) < int(awake1End[0:2]): flag = True elif int(t.hour) == int(awake1Start[0:2]) and int(t.minute) >= int(awake1Start[3:5]): flag = True elif int(t.hour) == int(awake1End[0:2]) and int(t.minute) <= int(awake1End[3:5]): flag = True elif int(t.hour) > int(awake2Start[0:2]) and int(t.hour) < int(awake2End[0:2]): flag = True elif int(t.hour) == int(awake2Start[0:2]) and int(t.minute) >= int(awake2Start[3:5]): flag = True elif int(t.hour) == int(awake2End[0:2]) and int(t.minute) <= int(awake2End[3:5]): flag = True else: flag = False # It is not Active Time return flag def alarm(): f = open (jsonFile,'r') jDict = json.load(f) alarmOn = jDict['data1'][0]['alarmOn'] alarmTime = jDict['data1'][0]['alarmTime'] alarmKind = jDict['data1'][0]['alarmKind'] now = datetime.now() if int(now.hour) == int(alarmTime[0:2]) and int(now.minute) == int(alarmTime[3:5]): if alarmOn == "true": # subprocess.call('sudo amixer -q sset Mic 0 -c 0', shell=True) if alarmKind == 'mild': bez.moveAct('happy') subprocess.call("sh "+ttsFile+" "+"朝ですよ", shell=True) bez.stop() else: bez.moveAct('happy') subprocess.call("sh "+ttsFile+" "+"朝だよ起きて起きて起きてー", shell=True) bez.stop() sleep (muteTime) # subprocess.call('sudo amixer -q sset Mic '+mic+' -c 0', shell=True) t=threading.Timer(20,alarm) # n秒後にまたスレッドを起動する t.setDaemon(True) # メインスレッドが終了したら終了させる t.start() |
・alarm()はユーザーが設定した時間になったら目覚ましのメッセージを発話させる関数です。
返答処理
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
def replyMessage(keyWord): # 対話 data = [] # 対話ファイル(csv)を変数dataに読み込む with open(csvFile, 'rb') as f: # csvFileをオープン for i in csv.reader(f): # ファイルから1行ずつiに読み込む data.append(i) # dataに追加 data1 = [] # dataから質問内容がキーワードに一致している行をdata1として抜き出す for index,i in enumerate(data): # index=連番 if unicode(i[0], 'utf-8')==keyWord: # i[0]はstrなのでutf-8に変換して比較する必要がある j = randint(1,100) # 1から100までの乱数を発生させる data1.append(i+[j]+[index]) # data1=質問内容,返答,乱数,連番のリスト if data1 == []: # data1が空っぽだったらランダムで返す for index,i in enumerate(data): j = randint(1,100) data1.append(i+[j]+[index]) maxNum = 0 # 複数の候補からランダムで選出。data1から欄数値が最大なものを選ぶ for i in data1: if i[2] > maxNum: maxNum = i[2] ansNum = i[3] # 発話 # subprocess.call('sudo amixer -q sset Mic 0 -c 0', shell=True) # 自分の声を認識してしまわないようにマイクを切る # 設定ファイルの読み込み f = open (jsonFile,'r') jDict = json.load(f) vol = jDict['data0'][0]['vol'] # スピーカーボリューム # mic = jDict['data0'][0]['mic'] # マイク感度 if timeCheck(): # 活動時間だったら会話する bez.moveRnd() subprocess.call('amixer cset numid=1 '+vol+'% -q', shell=True) # スピーカー音量 subprocess.call("sh "+ttsFile+" "+data[ansNum][1], shell=True) bez.stop() else: # 活動時間外は会話しない subprocess.call('amixer cset numid=1 60% -q', shell=True) # スピーカー音量 subprocess.call("sh "+ttsFile+" "+"活動時間外です", shell=True) sleep (5) subprocess.call('amixer cset numid=1 '+vol+'% -q', shell=True) # スピーカー音量 # subprocess.call('sudo amixer -q sset Mic '+mic+' -c 0', shell=True) # マイク感度を元に戻す def socket_buffer_clear(): while True: rlist, _, _ = select.select([client], [], [], 1) if len(rlist) > 0: dummy_buffer = client.recv(bufferSize) else: break |
・replayMessage()はkeywordを受け取ったら、それに一致する返答候補をリストアップし、その中からランダムで返答を選ぶ関数です。非活動時間の場合は、ボリュームを落として「活動時間外です」と喋らせています。
デバッグ用機能
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
def debug_message(message): t = datetime.now() message = str(t.minute)+":"+str(t.second)+":"+message print message # writeFile(message) # sys.stdout.write(message) # pass def writeFile(text): # デバッグファイル出力機能 f = open (debugFile,'r') textBefore = "" for row in f: textBefore = textBefore + row f.close() f = open (debugFile,'w') f.write(textBefore + text + "\n") f.close() |
・デバッグのための機能です。プログラム内の怪しいい箇所にdebug_message(“好きな文字列”)を挿入することで、debug.txtにメッセージが出力されるので、どこで異常が発生したかなどをチェックすることができます。
メインループ
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# サーボの初期化 bez = bezelie.Control() # べゼリー操作インスタンスの生成 bez.moveCenter() # サーボの回転位置をトリム値に合わせる client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # メインループ def main(): t=threading.Timer(10,alarm) t.setDaemon(True) t.start() bez.moveAct('happy') subprocess.call("sh "+ttsFile+" "+u"こんにちは"+user, shell=True) subprocess.call("sh "+ttsFile+" "+u"ぼく"+name, shell=True) bez.stop() # subprocess.call('sh exec_camera.sh', shell=True) # カメラの映像をディスプレイに表示 sleep (0.5) # TCPクライアントを作成しJuliusサーバーに接続する enabled_julius = False for count in range(5): try: client.connect(('localhost', 10500)) enabled_julius = True break except socket.error, e: print 'failed socket connect. retry' sleep (1) if enabled_julius == False: print 'Juliusが見つかりませんでした' sys.exit(1) data = "" socket_buffer_clear() try: while True: if "</RECOGOUT>\n." in data: # RECOGOUTツリーの最終行を見つけたら以下の処理を行う data = re.search(r'WORD\S+', data) # dataからWORD\sで始まる行を抽出 keyWord = data.group().replace("WORD=","").replace("\"","") # dataからキーワード以外を削除 replyMessage(keyWord) socket_buffer_clear() data = "" # 認識終了したのでデータをリセットする else: data = data + client.recv(bufferSize) # Juliusサーバーから受信 except KeyboardInterrupt: # CTRL+Cで終了 debug_message(' 終了しました') client.close() bez.moveCenter() bez.stop() sys.exit(0) if __name__ == "__main__": main() sys.exit(0) |
・200行でjuliusからデータを受け取り、その中に</RECOGOUT>\nが含まれていたら、その直前に音声認識の結果が含まれているので、194行目でWORD\sから始まる行を抜き出し、さらに195行目でキーワード以外を削除して、replyMessage()へ送っています。