作成したインタラクションシステム(ver1.1)
開発環境
前回の記事から変更になった箇所は赤文字で記しています(大学のパソコンすごいつよつよで嬉しい…)
- OS:Window 11 Home
- CPU:AMD Ryzen 7 7700X 8-Core Processor
- GPU1:NVIDIA GeForce RTX 3060
- GPU2:AMD Radeon(TM) Graphics
- Unity:ver 2022.3.9f1
- python:3.10.00
本記事で作成できるプログラム
□音声:VoiceVox
無料で使える中品質なテキスト読み上げ・歌声合成ソフトウェア。商用・非商用問わず無料で、誰でも簡単にお使いいただけます。イ…
0.はじめに
本記事ではChatGPT-APIを用いて対話機能を実装しますが,API自体の登録方法などは割愛します
下記サイトを参考に以下のプロセスを完了してください
- API_KEYの取得
- 支払情報(クレジットカード情報)の設定
- API_KEYを環境変数に設定
■API_KEYの取得方法
ChatGPT(OpenAI)のAPIキーの取得方法を非エンジニア向けに画像付きでわかりやすく解説します。…
■クレジットカードの設定方法
OpenAI APIの利用にあたって必要なOpenAI API利用料の支払い情報(クレジットカード情報等)の登録方法を画…
■API_KEYを環境変数に設定
PythonでAPIキーやパスワードを環境変数に保存し、安全に取り扱う方法について解説しています。環境変数に保存する理由…
1.ChatGPTの返答を音声で出力する
☆完成形
PythonからChatGPT-APIを呼び出す
まずは簡単なプログラムを作成して,仕組みを理解してみましょう
以下のプログラムは,公式のReferenceが提供しているサンプルを一部改変したものです
□chatgpt_test.py
from openai import OpenAI
import os
def responce_gpt1(text):
print("\n####################\nストリーミングあり\n####################\n")
#OpenAIのあれこれがはいったオブジェクトを呼び出す
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
#レスポンスを逐次生成していく
res = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
stream=True,
)
#レスポンスをコンソールにプリント
for chunk in res:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
def responce_gpt2(text):
print("\n\n####################\nストリーミングなし\n####################\n")
#OpenAIのあれこれが入ったオブジェクトを呼び出す
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
#レスポンスを一括で生成する
res = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
)
#レスポンスをコンソールにプリント
print(res.choices[0].message.content)
if __name__ =="__main__":
responce_gpt1("テスト用の短文を生成してください")
responce_gpt2("テスト用の短文を生成してください")
このプログラムを実行した結果は次の通りです
見ていただければわかるように,responce_gpt1(先に実行される方)は返信が逐次的に行われ
responce_gpt2(後に実行される方)は返信が一括で行われているのがわかるかと思います
これらの違いは「stream=True」が指定されているかそうでないかの違いです
今回は[stream]の設定しかしていませんが,他にも様々な引数がありカスタマイズが可能です
その辺りの詳細については公式のReferenceをご確認ください
また、本記事では割愛しますが
GPTに特定のキャラを演じさせたり,過去の履歴も踏まえたうえで返答をしてほしい場合は
message=[]内に次のような要素を加えることにより可能です
- 「こういう風な返信をしてね」という命令
{“role”: “system”, “content”: xxxxxxxxxxxxxxxxx…)} - 過去のユーザからの質問とそれに対しての返答
{“role”:”user”,”content”:xxxxxxxx……} / {“role”:”assistant”,”content”:xxxxx……}
それらに興味がある方は以下の記事を参考にしてください
3月1日、ChatGPTで使用されている言語モデルであるgpt-3.5-turboのAPIが公開されました。また、その…
はじめに 以下について記載する。 会話履歴を踏まえた応答を得る方法 Chat Completions AP…
返答を分割して音声生成を行う
今回は「返信を生成しながら並列的に音声を生成していく処理」を
「ある程度返信が返ってきたらそれを一文にまとめてあげて音声生成に投げる」という処理で実現します
先ほどの”逐次的処理”と”一括処理”の丁度中間的な処理になりますね
まずは「ある程度返信が返ってきたらそれを一文にまとめる」という部分を暫定的に作成します
□test1.py
from openai import OpenAI
import os
import re
def responce_gpt(text):
# OpenAIのオブジェクトを呼び出す
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# レスポンスを逐次生成していく
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
stream=True,
)
buffer = []
step = 1
for chunk in response:
if chunk.choices[0].delta.content:
buffer.append(str(chunk.choices[0].delta.content))
if buffer and ("\n" in buffer[-1] or "。" in buffer[-1]):
#「\n」や「。」の前に文字があった場合,res_textの一番最後に追加する
last_word = ""
match = re.search(r'([^ ]+)[\n。]', buffer[-1])
if match:
last_word = match.group(1)
#「\n」や「。」の後ろに文字が場合,次Stepのres_textの冒頭に来るようにする(=Bufferの先頭に追加する)
forward_word = ""
match = re.search(r'[\n。]+([^ ]*)', buffer[-1])
if match:
forward_word = match.group(1)
res_text = "".join(filter(None, buffer[:-1])) + last_word
buffer = [forward_word]
print(f"Step:{step}:{res_text}")
step += 1
if __name__ == "__main__":
responce_gpt("テスト用の短文を生成してください")
「stream=True」に設定すると,レスポンスが1~2文字でばばーっと送られてくるので
ひとまずそのレスポンスを配列に放り込んでいきます
そして、配列の一番後ろの要素に区切りを示す文字(今回は「\n」か「。」)が含まれている場合は
配列内の文字を結合して一文にして[res_text]という変数に放り込んでいます
ただ,たまに「。く」や「た/n」みたいなレスポンスが返ってくるときもあるので
その場合は「\n」や「。」の前後にくっついている文字を保存し配列に追加したり,
配列を初期化した後に一番初めの要素として入れたりしています
次に,この処理に前回の記事で紹介したような音声生成の関数を追加します(一部書き方を変更しています)
□test2.py
from openai import OpenAI
import re
import os
from pathlib import Path
from voicevox_core import VoicevoxCore, METAS
import pyaudio
from io import BytesIO
import wave
import sys
core = None
###################################
##返信を作る関数
###################################
def responce_gpt(text, spekaer_id):
# OpenAIのオブジェクトを呼び出す
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# レスポンスを逐次生成していく
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
stream=True,
)
buffer = []
step = 1
for chunk in response:
if chunk.choices[0].delta.content:
buffer.append(str(chunk.choices[0].delta.content))
if buffer and ("\n" in buffer[-1] or "。" in buffer[-1]):
last_word = ""
match = re.search(r'([^ ]+)[\n。]', buffer[-1])
if match:
last_word = match.group(1)
forward_word = ""
match = re.search(r'[\n。]+([^ ]*)', buffer[-1])
if match:
forward_word = match.group(1)
res_text = "".join(filter(None, buffer[:-1])) + last_word
buffer = [forward_word]
print(f"Step:{step}:{res_text}")
# 生成されたテキストを音声化
audio_file = create_audio(res_text, speaker_id)
_test_play(audio_file)
step += 1
###################################
##音声を作るための関数
###################################
def create_core(speaker_id):
global core
try:
if core is None:
print(">>> Create Core file…")
core = VoicevoxCore(open_jtalk_dict_dir=Path("open_jtalk_dic_utf_8-1.11"))
if not core.is_model_loaded(speaker_id):
print(f">>> Load ID:{speaker_id} model…")
core.load_model(speaker_id)
except Exception as e:
print(f"Error in create_core: {e}")
def create_audio(text, speaker_id):
global core
try:
print(">>>Create voice...")
wave_bytes = core.tts(text, speaker_id) # 音声合成を行う
wav_file_name = f'voice/make_sound[ID_{speaker_id}].wav'
with open(wav_file_name, "wb") as f:
f.write(wave_bytes) # ファイルに書き出す
except Exception as e:
print(f">>>Error:{e}")
wav_file_name = "voice/error.wav"
return wav_file_name
def _test_play(filename):
with open(filename, 'rb') as f:
audio_data = f.read()
wave_io = BytesIO(audio_data)
wave_obj = wave.open(wave_io, 'rb')
p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(wave_obj.getsampwidth()),
channels=wave_obj.getnchannels(),
rate=wave_obj.getframerate(),
output=True)
data = wave_obj.readframes(1024)
while data:
stream.write(data)
data = wave_obj.readframes(1024)
stream.stop_stream()
stream.close()
p.terminate()
################################
##実際実行される処理
################################
if __name__ == "__main__":
# VoicevoxのスピーカーID(適宜変更)
speaker_id = 61
# Core・モデルの読み込み
create_core(speaker_id)
# テキストからGPTのレスポンスを取得して、音声生成・再生
responce_gpt("テスト用の短文を生成してください", speaker_id)
これで,ChatG`PTからのレスポンスを適宜音声として再生することができるようになります
AudioQueryを変更する(+α)
デフォルトだと生成される音声の速度(話速)が少し遅いように感じたのでそこを変更します
話速などの音声生成に用いられる設定は「AudiioQuery」というオブジェクトに纏められています
公式のReferenceは以下ですが…ここに書かれている方法そのままだとダメです……
AudioQueryは次のようなプログラムで取得することができます
from pathlib import Path
from voicevox_core import VoicevoxCore, METAS
from pprint import pprint #結果を出力する際,普通のprintだとわかりづらいのでpprintを用います
text = "サンプルです"
style_id = 0
core = VoicevoxCore(open_jtalk_dict_dir=Path("open_jtalk_dic_utf_8-1.11"))
core.load_model(style_id)
query = core.audio_query(text, style_id)
pprint(query)
コンソールに出力される結果は次の通りです
AudioQuery(accent_phrases=[AccentPhrase(moras=[Mora(text='サ',
consonant='s',
consonant_length=0.10374545,
vowel='a',
vowel_length=0.12947202,
pitch=5.8439856),
Mora(text='ン',
consonant=None,
consonant_length=None,
vowel='N',
vowel_length=0.077509575,
pitch=5.947384),
Mora(text='プ',
consonant='p',
consonant_length=0.06626258,
vowel='u',
vowel_length=0.051226877,
pitch=5.8541822),
Mora(text='ル',
consonant='r',
consonant_length=0.037426792,
vowel='u',
vowel_length=0.057444185,
pitch=5.7396536),
Mora(text='デ',
consonant='d',
consonant_length=0.05320085,
vowel='e',
vowel_length=0.08943393,
pitch=5.491956),
Mora(text='ス',
consonant='s',
consonant_length=0.07419353,
vowel='U',
vowel_length=0.11446491,
pitch=0.0)],
accent=1,
pause_mora=None,
is_interrogative=False)],
speed_scale=1.0,
pitch_scale=0.0,
intonation_scale=1.0,
volume_scale=1.0,
pre_phoneme_length=0.1,
post_phoneme_length=0.1,
output_sampling_rate=24000,
output_stereo=False,
kana="サ'ンプルデ_ス")
このうち[speed_scale]の値を変更すれば,話速を変更することができそうです
(もちろん,他のパラメタも変更すれば抑揚や声量なども調整することができます)
Audioqueryの要素は[Audioqueryが入っている変数].[要素名]で指定できます
今回は[query]という変数に保存していて,取得したい要素は[speed_scale]なので
「query.speed_scale」で値を取得し,値を書き換え,変更したAudioqueryで再度音声を作ります
以下のプログラムを実行すると,少しだけ話速が速い音声が生成されると思います
□test3.py
from pathlib import Path
from voicevox_core import VoicevoxCore, METAS
from pprint import pprint
text = "サンプルです"
speaker_id = 0
core = VoicevoxCore(open_jtalk_dict_dir=Path("open_jtalk_dic_utf_8-1.11"))
core.load_model(speaker_id)
query = core.audio_query(text, speaker_id)
#AudioqueryのSpeed_scaleの値を変更する
query.speed_scale += 0.1
#変更したAudioqueryから音声を再度生成する
wav = core.synthesis(query,speaker_id)
#音声の保存
with open("test.wav","wb") as f:
f.write(wav)
2-1.Unity側のGUIと処理を修正する
AudioQueryをいじることで話速などを調整できるようになったので
せっかくなら,InputFiledに入力された「テキスト」情報だけを送るのではなく
「どういう風に音声を作るか」という情報もGUIで操作できるといいですよね…?
☆完成形
UIの準備
VoiceVox.exeのインタフェースを参考に,以下の2点を操作できるGUIを追加します
- 誰にどのスタイルで喋らせるか(=Speaker_id)
- どのように喋らせるか(=Audioqueryの「speed」・「pitch」・「intornation」・「volume」)
また,生成されたテキストを確認するためのGUI(Scroll View)も追加し,下記のように並べました
赤で囲っている箇所が「誰にどのようなスタイルで喋らせるか」を設定するUI
青で囲っている箇所が「どのように喋らせるか」を設定するUI
そして,緑で囲っている箇所が「何をしゃべらせるか」を設定するUIです
配置したUIは「Horizontal Layout Group」や「Content Size Fitter」をアタッチした
[空のGameObjct]の子要素にすることで綺麗に並ぶようにしています
DropDownの設定
まずは,赤枠で囲った部分の処理から実装します
ここでしたいのは,「中国うさぎ」のような[キャラクタ名]を左のdropdownから選び
「ノーマル」といった[そのキャラクタが持つスタイル]を右のdropdonwで選ぶことで
そのキャラクタとスタイルに該当するSpekaer_idが指定される.という処理です
そういった情報は「voicevox_core/model」にある「metas.json」に記載されていますので
そのjsonファイルを読み込み,その結果をdropdownに適した形で表示するスクリプトを作成しました
□SpeakerLoader.cs
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using TMPro; // TextMeshProを使用
using System.Linq; // HashSetやLINQの使用
public class SpeakerLoader : MonoBehaviour
{
//meta.jsonの形式を定義
[System.Serializable]
public class Style
{
public string name;
public int id;
}
[System.Serializable]
public class Character
{
public string name;
public Style[] styles;
}
[System.Serializable]
public class CharacterList
{
public Character[] characters;
}
public TMP_Dropdown dropdown1;
public TMP_Dropdown dropdown2;
public TextAsset textAsset;
public TMP_FontAsset fontAsset;
private CharacterList characterList;
public int SelectedStyleId { get; private set; } = -1; // 初期値を設定
public int defaultSpeakerId = 0; // デフォルトのSpeakerID
void Start()
{
if (textAsset != null)
{
LoadJsonData(textAsset);
}
if (fontAsset != null)
{
ApplyFontToDropdowns(fontAsset);
}
if (characterList?.characters.Length > 0)
{
PopulateCharacterDropdown();
SetDefaultSpeaker(defaultSpeakerId);
//LogSpeakerIds(); // スピーカーIDの一覧を表示
}
dropdown1.onValueChanged.AddListener(OnCharacterSelected);
dropdown2.onValueChanged.AddListener(OnStyleSelected);
}
private void LoadJsonData(TextAsset newTextAsset)
{
string jsonText = "{\"characters\": " + newTextAsset.text + "}";
characterList = JsonUtility.FromJson<CharacterList>(jsonText);
if (characterList?.characters.Length > 0)
{
PopulateCharacterDropdown();
SetDefaultSpeaker(defaultSpeakerId);
}
}
private void ApplyFontToDropdowns(TMP_FontAsset newFontAsset)
{
dropdown1.captionText.font = newFontAsset;
dropdown1.itemText.font = newFontAsset;
dropdown2.captionText.font = newFontAsset;
dropdown2.itemText.font = newFontAsset;
}
private void PopulateCharacterDropdown()
{
var characterNames = characterList.characters.Select(character => character.name).ToList();
dropdown1.ClearOptions();
dropdown1.AddOptions(characterNames);
}
private void OnCharacterSelected(int index)
{
if (characterList != null && index < characterList.characters.Length)
{
PopulateStyleDropdown(characterList.characters[index].styles);
OnStyleSelected(0);
}
}
private void PopulateStyleDropdown(Style[] styles)
{
var uniqueStyleNames = new HashSet<string>(styles.Select(style => style.name));
dropdown2.ClearOptions();
dropdown2.AddOptions(uniqueStyleNames.ToList());
}
private void OnStyleSelected(int index)
{
if (characterList != null && dropdown2.options.Count > index)
{
var selectedCharacter = characterList.characters[dropdown1.value];
var selectedStyle = selectedCharacter.styles[index];
SelectedStyleId = selectedStyle.id; // スタイルIDを保存
}
}
private void SetDefaultSpeaker(int speakerId)
{
if (speakerId >= 0 && speakerId < characterList.characters.Length)
{
dropdown1.value = speakerId; // デフォルトのスピーカーを選択
dropdown1.RefreshShownValue(); // 表示を更新
OnCharacterSelected(speakerId); // キャラクターを選択
}
}
private void LogSpeakerIds()
{
if (characterList != null)
{
var speakerIds = characterList.characters
.Select((character, index) => $"ID: {index}, Name: {character.name}")
.ToList();
Debug.Log("Speaker IDs:\n" + string.Join("\n", speakerIds));
}
}
//外部スクリプトから読み込む関数
public int GetStyleId()
{
return SelectedStyleId;
}
}
次に,このスクリプトを適当なGameObject(今回は「input/Script」)にアタッチします
[Dropbox1]にはキャラクタ一覧を出したいDropboxを
[Dropbox2]にスタイルの一覧を出したいDropboxを
[Text Assets]にはキャラクタやスタイルについて定義されている「meta.json」を指定してください
また,TMPのデフォルトに設定されているTextAssetは日本語に対応していないため
日本語に対応したTextAssetを作成し,[Font Assets]に指定します
TextAssetについては,有志の方が作成してくれたものを用いるか
以下のサイトなどを参考に日本語に対応したものを作成してください
Unity - Text Mesh Pro Japanese Assets. Contribute to nilpkth…
最後の[Default Speaker ID]は,シーンを実行した際に選択されてほしいキャラクタのIDを入力します
これは「metas.json」に書かれているidではなく,Unity側でmetas.jsonを読み込んだ際に
キャラクタ情報(Name)に対して上から順に振った番号です(おそらく以下のようになってるはずです)
Speaker IDs:
ID: 0, Name: 四国めたん
ID: 1, Name: ずんだもん
ID: 2, Name: 春日部つむぎ
ID: 3, Name: 雨晴はう
ID: 4, Name: 波音リツ
ID: 5, Name: 玄野武宏
ID: 6, Name: 白上虎太郎
ID: 7, Name: 青山龍星
ID: 8, Name: 冥鳴ひまり
ID: 9, Name: 九州そら
ID: 10, Name: もち子さん
ID: 11, Name: 剣崎雌雄
ID: 12, Name: WhiteCUL
ID: 13, Name: 後鬼
ID: 14, Name: No.7
ID: 15, Name: ちび式じい
ID: 16, Name: 櫻歌ミコ
ID: 17, Name: 小夜/SAYO
ID: 18, Name: ナースロボ_タイプT
ID: 19, Name: †聖騎士 紅桜†
ID: 20, Name: 雀松朱司
ID: 21, Name: 麒ヶ島宗麟
ID: 22, Name: 春歌ナナ
ID: 23, Name: 猫使アル
ID: 24, Name: 猫使ビィ
ID: 25, Name: 中国うさぎ
ID: 26, Name: 栗田まろん
ID: 27, Name: あいえるたん
ID: 28, Name: 満別花丸
ID: 29, Name: 琴詠ニア
これらの設定が完了すると下記のようにキャラクタとスタイルを選ぶことができます
InputFiledsの設定
次に青枠の部分の処理を実装します
ここは単純に,4つのInputFiledに入っている数値をそれぞれ取得する処理を書いています
ただ,Rangeより大きい/小さい数値が指定されたり,数値以外の値(文字など)が入っていると
予期せぬ動作をしますので,値を確認する処理も追加しています
□InputController.cs
using UnityEngine;
using TMPro;
public class InputFieldLoader : MonoBehaviour
{
public TMP_InputField speedInputField;
public TMP_InputField pitchInputField;
public TMP_InputField intonationInputField;
public TMP_InputField volumeInputField;
public TMP_FontAsset fontAsset;
private float initialSpeed = 1.0f;
private float initialPitch = 0.0f;
private float initialIntonation = 1.0f;
private float initialVolume = 1.0f;
private void Start()
{
// 各InputFieldに初期値をセット
speedInputField.text = initialSpeed.ToString();
pitchInputField.text = initialPitch.ToString();
intonationInputField.text = initialIntonation.ToString();
volumeInputField.text = initialVolume.ToString();
// フォントを各InputFieldに適用
if (fontAsset != null)
{
speedInputField.textComponent.font = fontAsset;
pitchInputField.textComponent.font = fontAsset;
intonationInputField.textComponent.font = fontAsset;
volumeInputField.textComponent.font = fontAsset;
}
}
// デバッグ用
public void OnButtonClick()
{
float speed = ValidateSpeed(speedInputField.text);
float pitch = ValidatePitch(pitchInputField.text);
float intonation = ValidateIntonation(intonationInputField.text);
float volume = ValidateVolume(volumeInputField.text);
Debug.Log($"Speed: {speed}, Pitch: {pitch}, Intonation: {intonation}, Volume: {volume}");
}
// 値を確認するための処理
private float ValidateSpeed(string input)
{
float value;
if (float.TryParse(input, out value) && value >= 0.5f && value <= 2.0f)
return value;
speedInputField.text = initialSpeed.ToString();
return initialSpeed;
}
private float ValidatePitch(string input)
{
float value;
if (float.TryParse(input, out value) && value >= -0.15f && value <= 0.15f)
return value;
pitchInputField.text = initialPitch.ToString();
return initialPitch;
}
private float ValidateIntonation(string input)
{
float value;
if (float.TryParse(input, out value) && value >= 0.0f && value <= 2.0f)
return value;
intonationInputField.text = initialIntonation.ToString();
return initialIntonation;
}
private float ValidateVolume(string input)
{
float value;
if (float.TryParse(input, out value) && value >= 0.0f && value <= 2.0f)
return value;
volumeInputField.text = initialVolume.ToString();
return initialVolume;
}
// 外部スクリプトから呼び出す関数
public float GetSpeed()
{
return ValidateSpeed(speedInputField.text);
}
public float GetPitch()
{
return ValidatePitch(pitchInputField.text);
}
public float GetIntonation()
{
return ValidateIntonation(intonationInputField.text);
}
public float GetVolume()
{
return ValidateVolume(volumeInputField.text);
}
このスクリプトを,先ほど「SpeakerLoader.cs」をアタッチしたオブジェクトにアタッチします
そして,それぞれ[speed]・[pitch]・[intonation]・[volume]に対応するInputFiledを指定してください
なお,こちらのスクリプトもFontAssetを指定できるようにはしていますが必須ではありません
リクエスト処理の修正
Python側とデータの送受信をするスクリプトに以下の機能を追加します
- 音声の生成をpython側にリクエストする際に,GUIの値を参照してそれも送り付ける
- python側から取得した音声を再生すると同時に,その音声のテキストをScroll Viewに提示する
そのためにまずはScroll Viewに表示するためのスクリプトを作成します
この処理は「[task_id]:[本文]」というフォーマットで文字列が入ってくることを前提としています
□ConsoleExport.cs
using UnityEngine;
using UnityEngine.UI;
using TMPro;
using System.Collections.Generic;
public class ConsoleExport : MonoBehaviour
{
[SerializeField] private ScrollRect scrollView; // Scroll Rectの参照
[SerializeField] private GameObject textWithImagePrefab; // TextとImageを含むPrefabの参照
[SerializeField] private RectTransform contentPanel; // Scroll ViewのContent RectTransform
[SerializeField] private int maxInstances = 10; // 最大インスタンス数の制限
private List<GameObject> textInstances = new List<GameObject>(); // テキストインスタンスの管理リスト
public void Export(string exportText)
{
// TaskIDと本文を分割
string[] parts = exportText.Split(new[] { ':' }, 2); // ':'で分割し、最大2つの部分を得る
if (parts.Length == 2)
{
string taskId = parts[0].Trim(); // [TaskID]部分を取得
string content = parts[1].Trim(); // [本文]部分を取得
//Debug.Log($"TaskID: {taskId}, Content: {content}");
// TaskIDが'test'以外の場合にコンソールに表示
if (taskId != "test")
{
// TextとImageを含むPrefabをインスタンス化してContent Panelに追加
GameObject newTextObject = Instantiate(textWithImagePrefab, contentPanel);
// 新しいオブジェクトのRectTransformを取得
RectTransform newTextRectTransform = newTextObject.GetComponent<RectTransform>();
// テキストの内容を設定
TMP_Text textComponent = newTextObject.GetComponentInChildren<TMP_Text>();
if (textComponent != null)
{
if (content.Contains("Error"))
{
// エラーコードが含まれている場合、テキストを赤くし、太字にする
textComponent.text = $"<color=red><b>{content}</b></color>";
}
else
{
textComponent.text = content;
}
}
else
{
Debug.LogError("TMP_Text component is missing in the Text Prefab!");
}
// 新しいオブジェクトを親オブジェクト内で最下部に移動
newTextRectTransform.SetAsLastSibling();
// テキストインスタンスをリストに追加
textInstances.Add(newTextObject);
// インスタンス数が最大数を超えた場合、古いインスタンスを削除
if (textInstances.Count > maxInstances)
{
// 最も古いインスタンスを削除
GameObject oldTextObject = textInstances[0];
textInstances.RemoveAt(0);
Destroy(oldTextObject);
}
// Layoutを更新して、Scroll Rectの表示位置を最下部にします
LayoutRebuilder.ForceRebuildLayoutImmediate(contentPanel);
// スクロール位置を更新
Canvas.ForceUpdateCanvases(); // Canvasの更新を強制する
scrollView.verticalNormalizedPosition = 0; // Content Panelの最下部にスクロールします
}
}
else
{
Debug.LogWarning("入力形式が正しくありません。");
}
}
}
このスクリプトも適当なGameObject(今回は「output/Scripts」)にアタッチし
出力を表示したい「Scroll View」とその子要素にある「Content」をそれぞれ指定してください
また一番下の[Max Instances]はScroll Viewに最大何行まで出力を行うかを決める値です
加えて,今回は出力を「LINE」のような丸い枠で囲われている状態で出したかったので
以下の記事を参考に,TextとImageを組み合わせた「Prefab」をまずは作成しそれを設定しています
【Unity】吹き出しUI. Contribute to Okamochi000/AutoSpeechBalloon d…
Unity Simple UGUI SpeechBubble. Contribute to KimChunsick/Un…
次に,今まで作成した処理と前回の記事で取り上げた「TextToSpeech.cs」をベースに
Python側と送受信を行うスクリプトを修正します
□SpeechResponce.cs
using System.Collections;
using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI;
using TMPro;
public class SpeechResponce : MonoBehaviour
{
[System.Serializable]
public class ReplayRequest
{
public string wav_path;
public string sentence;
}
public Button button;
public TMP_InputField inputField;
public AudioSource audioSource;
public GameObject GUIScriptObject;
private const string baseUrl = "http://localhost:5000";
private int _speaker_id;
private string _task_id;
private ConsoleExport _consoleExport;
private SpeakerLoader _speakerLoader;
private InputFieldLoader _inputfieldLoader;
private void Start()
{
_consoleExport = GetComponent<ConsoleExport>();
_speakerLoader = GUIScriptObject.GetComponent<SpeakerLoader>();
_inputfieldLoader = GUIScriptObject.GetComponent<InputFieldLoader>();
button.onClick.AddListener(OnSend);
}
private void OnSend()
{
string text = inputField.text;
_speaker_id = _speakerLoader.GetStyleId(); // スタイルIDを取得
StartCoroutine(SendRequest(text, _speaker_id));
inputField.text = "";
}
public IEnumerator SendRequest(string inputText, int id_number)
{
// モデルのロードとかが終わるまで待機
yield return Initialization(id_number);
string url = $"{baseUrl}/create_responce_voice";
WWWForm form = new WWWForm();
form.AddField("text", inputText);
form.AddField("id_number", id_number);
using (UnityWebRequest www = UnityWebRequest.Post(url, form))
{
yield return www.SendWebRequest();
if (www.result != UnityWebRequest.Result.Success)
{
Debug.LogError(www.error);
yield break;
}
_task_id = www.downloadHandler.text;
Debug.Log("サーバーからの応答: " + _task_id);
StartCoroutine(CheckForVoiceFiles());
}
}
public IEnumerator Initialization(int id_number)
{
string url = $"{baseUrl}/initialize";
float speed = _inputfieldLoader.GetSpeed(); // スピードを取得
float pitch = _inputfieldLoader.GetPitch(); // ピッチを取得
float intonation = _inputfieldLoader.GetIntonation(); // 抑揚を取得
float volume = _inputfieldLoader.GetVolume(); // 音量を取得
WWWForm form = new WWWForm();
form.AddField("id_number", id_number);
form.AddField("speed_scale", speed.ToString());
form.AddField("pitch_scale", pitch.ToString());
form.AddField("intonation_scale", intonation.ToString());
form.AddField("volume_scale", volume.ToString());
using (UnityWebRequest www = UnityWebRequest.Post(url, form))
{
yield return www.SendWebRequest();
if (www.result != UnityWebRequest.Result.Success)
{
Debug.LogError(www.error);
yield break;
}
Debug.Log("Get: " + www.downloadHandler.text);
}
}
public IEnumerator CheckForVoiceFiles()
{
string checkVoiceUrl = $"{baseUrl}/get_sentence";
int currentStep = 1;
int maxRetries = 50;
while (currentStep <= maxRetries)
{
WWWForm form = new WWWForm();
form.AddField("step", currentStep);
form.AddField("task_id", _task_id);
using (UnityWebRequest voiceRequest = UnityWebRequest.Post(checkVoiceUrl, form))
{
yield return voiceRequest.SendWebRequest();
if (voiceRequest.result != UnityWebRequest.Result.Success)
{
Debug.LogError(voiceRequest.error);
yield break;
}
switch (voiceRequest.responseCode)
{
case 200:
string responseText = voiceRequest.downloadHandler.text;
ReplayRequest replayRequest = JsonUtility.FromJson<ReplayRequest>(responseText);
Debug.Log($"取得した音声内容 (Step {currentStep}): " + replayRequest.sentence);
yield return CreateVoice(replayRequest.wav_path, replayRequest.sentence);
yield return new WaitForSeconds(1);
currentStep++;
break;
case 202:
Debug.Log(voiceRequest.downloadHandler.text);
yield return new WaitForSeconds(3);
break;
case 204:
Debug.Log(voiceRequest.downloadHandler.text);
yield break;
case 404:
case 500:
Debug.Log($"エラー: {voiceRequest.downloadHandler.text}");
yield break;
default:
Debug.LogError("不明なエラーが発生しました。");
yield break;
}
}
}
Debug.Log("最大リトライ回数に達しました。");
}
public IEnumerator CreateVoice(string wav_path, string sentence)
{
string audioUrl = $"{baseUrl}/get_audio?file_path={wav_path}";
using (UnityWebRequest audioRequest = UnityWebRequestMultimedia.GetAudioClip(audioUrl, AudioType.WAV))
{
yield return audioRequest.SendWebRequest();
if (audioRequest.result != UnityWebRequest.Result.Success)
{
Debug.LogError($"Audio request failed: {audioRequest.error}");
yield break;
}
AudioClip audioClip = DownloadHandlerAudioClip.GetContent(audioRequest);
audioSource.clip = audioClip;
audioSource.Play();
_consoleExport.Export(sentence);
while (audioSource.isPlaying)
{
yield return null;
}
}
}
}
このスクリプトは「ConsoleExport.cs」があるオブジェクトにアタッチしてください
そして,Pythonに値を送信するための[Button]や文字列を記述する[InputFiled]
最終的に音声を再生したいのでそれに必要な[AuidoSource]をそれぞれ設定してください
また,「SpeakerLoader.cs」や「InputFiledLoader.cs」内の関数を処理内で用いるため
それらのスクリプトがアタッチされているオブジェクトも設定してください
これでUnity側の設定は完了です
Pythonのサーバが立ち上がっている状態でシーンを実行すると「完成形」のように
生成音声がGUIのパラメタによって変化し,出力結果も画面上に表示されると思います
2-2.Pyton側の処理を修正する
今までの処理は1つのレーンでUnity側への応答と音声生成を行っていました
ただこの方法だと,音声生成の処理中はレスポンスができないような事態に陥ります
ですので,Pythonが提供している「Mlutiprocessing」を用いて
次のように応答処理と音声処理を完全に分離できるように処理を変更します
☆完成形
音声処理が速すぎてわかりづらいですが…音声生成中にレスポンスを受け取って
「get_sentence」や「get_audiofile」が動いているのがわかるかと思います
音声生成処理の修正
前回の内容に今回述べた“ChatGPTによる返答生成”や“AudioQueryのパラメタ変更”等を加え
その処理をMultiprocessingに対応するような形に修正したプログラムは次の通りです
□test_voice.py
import os
import shutil
import re
import multiprocessing
from pathlib import Path
from voicevox_core import VoicevoxCore
from openai import OpenAI
# グローバル変数
lock = multiprocessing.Lock()
audio_settings = {}
core = None
def audio_process(queue, shared_jobs):
while True:
try:
task = queue.get()
if task is None:
break
command = task[0]
if command == "create_core":
speaker_id = int(task[1])
create_core(speaker_id, shared_jobs)
elif command == "create_setting":
speaker_id = int(task[1])
speed = float(task[2])
pitch = float(task[3])
intonation = float(task[4])
volume = float(task[5])
create_setting(speaker_id, speed, pitch, intonation, volume, shared_jobs)
elif command == "create_responce":
text = task[1]
speaker_id = int(task[2])
task_id = task[3]
create_responce(text, speaker_id, task_id, shared_jobs)
except KeyboardInterrupt:
pass
except Exception as e:
print(f">>> Error in audio_process: {e}")
def create_core(speaker_id, shared_jobs):
global core
task_id = "core_set"
try:
_update_job_status(shared_jobs, task_id, "running")
if core is None:
print(">>> Create Core file…")
core = VoicevoxCore(open_jtalk_dict_dir=Path("open_jtalk_dic_utf_8-1.11"))
if not core.is_model_loaded(speaker_id):
print(f">>> Load ID:{speaker_id} model…")
core.load_model(speaker_id)
_update_job_status(shared_jobs, task_id , "completed")
except Exception as e:
_update_job_status(shared_jobs, task_id , "error", error=str(e))
print(f"Error in create_core: {e}")
def create_setting(speaker_id, speed, pitch, intonation, volume, shared_jobs):
global audio_settings
task_id = "query_set"
try:
_update_job_status(shared_jobs, task_id, "running")
audio_settings[speaker_id] = {
"speed_scale": float(speed),
"pitch_scale": float(pitch),
"intonation_scale": float(intonation),
"volume_scale": float(volume),
}
_update_job_status(shared_jobs, task_id , "completed")
except Exception as e:
_update_job_status(shared_jobs, task_id , "error", error=str(e))
print(f">>> Error in create_setting: {e}")
def create_responce(text, speaker_id, task_id, shared_jobs):
global core
if core is None:
create_core(speaker_id)
try:
_update_job_status(shared_jobs, task_id, 'running', 0, None)
print(">>> 音声生成開始")
dir_path = 'voice/temp'
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
os.makedirs(dir_path)
if task_id == "test" and text=="":
text="これはテスト用の音声です"
_update_job_status(shared_jobs, task_id, "running")
_create_audio_chunk(core, text, int(speaker_id), 1, task_id)
_update_job_status(shared_jobs, task_id, "completed")
else:
response = _replyGPTMessage(text)
if not response:
raise Exception(">>> OpenAI APIからの応答が無効です。")
buffer = []
step = 1
for chunk in response:
if chunk.choices[0].delta.content:
buffer.append(str(chunk.choices[0].delta.content))
if buffer and ("\n" in buffer[-1] or "。" in buffer[-1]):
last_word = ""
match = re.search(r'([^ ]+)[\n。]', buffer[-1])
if match:
last_word = match.group(1)
forward_word = ""
match = re.search(r'[\n。]+([^ ]*)', buffer[-1])
if match:
forward_word = match.group(1)
res_text = "".join(filter(None, buffer[:-1])) + last_word
buffer = [forward_word]
_create_audio_chunk(core, res_text, int(speaker_id), step, task_id)
_update_job_status(shared_jobs, task_id, 'running', step) # 現在のステップを更新
step += 1
if os.listdir(dir_path):
_update_job_status(shared_jobs, task_id, 'completed') # 完了状態を更新
print(f">>> 音声生成完了: タスク {task_id} が完了しました。")
else:
_update_job_status(shared_jobs, task_id, 'error', error="音声ファイルが生成されませんでした")
except Exception as e:
_update_job_status(shared_jobs, task_id, 'error', error=str(e))
print(f">>> Error in create_responce: {str(e)}")
def _replyGPTMessage(message):
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
try:
response = client.chat.completions.create(
messages=[{"role": "user", "content": message}],
model="gpt-4o-mini",
stream=True,
timeout=10
)
return response
except Exception as e:
print(f">>> Error while calling OpenAI API: {str(e)}")
raise
def _create_audio_chunk(core, res_text, speaker_id, step, task_id):
#音声用のテキストの作成 + タグ付けしたテキストの生成(処理が複雑になった場合は関数にまとめる)
write_text = f"{task_id}: {res_text.strip()}"
auido_text = _remove_special_character(res_text)
audio_setting = audio_settings.get(speaker_id, {})
audio_query = core.audio_query(auido_text, speaker_id)
audio_query = _update_audioquery(audio_query, audio_setting)
wave_bytes = core.synthesis(audio_query, speaker_id)
file_directory = os.path.join('voice/temp', f'[TaskID_{task_id}] Step_{step}')
with open(file_directory + ".wav", "wb") as f:
f.write(wave_bytes)
with open(file_directory + ".txt", "w") as f:
f.write(write_text)
if task_id == "test":
print(f">>> テスト用の音声ファイルがされました")
else:
print(f">>> Step {step} の音声ファイルが生成されました。")
def _remove_special_character(text):
# 冒頭の'-'を削除し、特殊文字を削除
cleaned_text = re.sub(r'^[*-]*\s*|^[#*]+\s*', '', text) # 冒頭の'-'や'###'、'**'を削除
cleaned_text = re.sub(r'[#*]', '', cleaned_text) # '#'と'*'を削除
cleaned_text = re.sub(r'\(.*?\)', '', cleaned_text) # 括弧内の内容を削除
return cleaned_text
def _update_audioquery(AudioQuery, setting: dict):
AudioQuery.speed_scale = float(setting.get("speed_scale", 1.0))
AudioQuery.pitch_scale = float(setting.get("pitch_scale", 0))
AudioQuery.intonation_scale = float(setting.get("intonation_scale", 1.0))
AudioQuery.volume_scale = float(setting.get("volume_scale", 1.0))
return AudioQuery
def _update_job_status(shared_jobs, task_id, state, step=None, error=None):
"""shared_jobsの状態を更新する関数"""
with lock: # ロックを使用
job_status = {'state': state}
if step is not None:
job_status['step'] = step
if error is not None:
job_status['error'] = error
# shared_jobsをロック内で更新
shared_jobs[task_id] = job_status
#print(shared_jobs) #デバック用
if __name__ == "__main__":
queue = multiprocessing.Queue()
manager = multiprocessing.Manager()
shared_jobs = manager.dict() # 共有辞書を作成
process = multiprocessing.Process(target=audio_process, args=(queue, shared_jobs))
process.start()
# コアを作成
speaker_id = 61
queue.put(("create_core", speaker_id))
# 音声設定を作成
queue.put(("create_setting", speaker_id, 1.2, 0, 1, 1))
# テスト
queue.put(("create_responce", "",speaker_id, "test"))
queue.put(("create_responce", "一文だけ文章を生成してください",speaker_id, f"{speaker_id}-1"))
# 終了シグナル
queue.put(None)
# プロセスが終了するのを待つ
process.join()
「if _name_=”_main_”」以下の内容は
音声生成を別のプログラムからMultiprocessで実行する処理を模倣しています
プログラムが実行されるとメモリを確保し「audio_process」を回し続け(=待機させ)ます
そして,特定のqueueが来たらそれに応じた処理をする…といった感じです
特定の処理として定義しているのは次の4つです
- create_core:VoiceVoxのCoreファイルやmodelデータのインストールを行う
- create_setting:話速などのAudioqueryの設定を行う
- create_audio:ChatGPTによる返答を音声として生成する
- None:audio_processを終了させる
また,「処理が完了したか」や「音声生成がどれくらい進んでいるのか」というのを確認するために
処理の合間合間に「_update_jobs_status」を呼び出し,状況を「shared_jobs」に保存しています
サーバ側処理の修正
前回の記事では「Flask」というフレームワークを用いてましたが
開始/終了処理や非同期処理をしたかったため「FastAPI」に変更しました
□test_server.py
import os
import multiprocessing
from io import BytesIO
from contextlib import asynccontextmanager
import asyncio
from fastapi import FastAPI, HTTPException, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from test_voice import audio_process
# グローバル変数
queue = None
manager = None
shared_jobs = None
process = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global process
global queue
global shared_jobs
queue = multiprocessing.Queue()
manager = multiprocessing.Manager()
shared_jobs = manager.dict() # 共有辞書を作成
await _start_audio_process()
yield # FastAPIのライフサイクルのために待機
await _stop_audio_process()
async def _start_audio_process():
global process
if process is None or not process.is_alive():
process = multiprocessing.Process(target=audio_process, args=(queue, shared_jobs))
process.start()
async def _stop_audio_process():
global process
if process is not None:
queue.put(None) # プロセスに終了信号を送る
process.join() # プロセスが終了するのを待つ
process = None
app = FastAPI(lifespan=lifespan)
# CORSを許可
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/initialize")
async def initialize(
id_number: int = Form(...),
speed_scale: float = Form(...),
pitch_scale: float = Form(...),
intonation_scale: float = Form(...),
volume_scale: float = Form(...)
):
try:
# coreファイルの作成と設定の作成をキューに追加
queue.put(("create_core", id_number))
queue.put(("create_setting", id_number, speed_scale, pitch_scale, intonation_scale, volume_scale))
# core_set と query_set の両方が完了するまで待機
time_elapsed, time_limit = 0, 20 # 最大20秒間待機
while time_elapsed < time_limit:
core_status = shared_jobs.get("core_set", {})
query_status = shared_jobs.get("query_set", {})
# 両方の処理が完了したらレスポンスを返す
if core_status.get('state') == 'completed' and query_status.get('state') == 'completed':
return "初期化が完了しました"
# どちらかの処理がエラーなら、エラーメッセージを返す
if core_status.get('state') == 'error' or query_status.get('state') == 'error':
error_message = core_status.get('error') or query_status.get('error') or "不明なエラーが発生しました"
raise HTTPException(status_code=500, detail=f"エラーが発生しました: {error_message}")
time_elapsed += 1
await asyncio.sleep(1) # 1秒ごとにチェック
else:
# 20秒待っても完了しなかった場合、タイムアウトとしてエラーを返す
raise HTTPException(status_code=500, detail="設定の読み込みがタイムアウトしました")
except Exception as e:
raise HTTPException(status_code=500, detail=f"初期化中にエラーが発生しました: {str(e)}")
@app.post("/create_responce_voice")
async def start_voice(
text: str = Form(...),
id_number: int = Form(...)
):
try:
if "test_create:" in text:
task_id = "test"
text = text.split(":")[1]
else:
task_id = f"{id_number}-{len(shared_jobs) + 1}"
queue.put(("create_responce", text, id_number, task_id))
# 音声生成が開始されるまで待機(速すぎると次の処理で404エラー出るので)
time, limit = 0, 20
while time < limit:
job_status = shared_jobs.get(task_id)
if job_status:
state = job_status.get('state')
if state in ['running', 'completed']:
return task_id # どちらの状態でもタスクIDを返す
time += 1
await asyncio.sleep(1)
else:
# タイムアウトに到達した場合
raise HTTPException(status_code=500, detail="音声生成の開始に時間がかかりすぎています")
except KeyError:
raise HTTPException(status_code=404, detail="タスクが見つかりませんでした。")
except Exception as e:
raise HTTPException(status_code=500, detail="音声生成の開始中にエラーが発生しました。")
@app.post("/get_sentence")
async def get_sentence(
step: int = Form(...),
task_id: str = Form(...)
):
task_id = task_id.strip().replace('"', '')
if task_id not in shared_jobs:
raise HTTPException(status_code=404, detail="タスクIDが見つかりません。")
job_status = shared_jobs[task_id]
if job_status['state'] == 'error':
error_message = job_status.get('error', "不明なエラーが発生しました。")
raise HTTPException(status_code=500, detail=f"エラーが発生しました: {error_message}")
txt_path = os.path.join("voice/temp/", f"[TaskID_{task_id}] Step_{step}.txt")
wav_path = os.path.join("voice/temp/", f"[TaskID_{task_id}] Step_{step}.wav")
if os.path.exists(txt_path) and os.path.exists(wav_path):
with open(txt_path, 'r') as f:
sentence = f.read()
data = {"wav_path": wav_path, "sentence": sentence}
return data
if job_status['state'] == 'running':
raise HTTPException(status_code=202, detail="音声生成中です。もう少しお待ちください。")
elif job_status['state'] == 'completed':
raise HTTPException(status_code=204, detail="これ以上の音声ファイルは存在しません")
else:
raise HTTPException(status_code=404, detail="指定されたファイルが見つかりません。")
@app.get("/get_audio")
async def audio(file_path: str):
if not file_path:
raise HTTPException(status_code=400, detail="wav_pathが指定されていません")
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail=f"指定されたファイルが見つかりません: {file_path}")
with open(file_path, 'rb') as f:
audio_data = f.read()
return StreamingResponse(BytesIO(audio_data), media_type='audio/wav', headers={"Content-Disposition": f"attachment; filename={os.path.basename(file_path)}"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
開始処理(@asynccontextmanager の部分)で音声生成処理を別プロセスで実行しています
そして,クライアントからリクエストが来た場合は,それぞれ対応したqueueを叩きつけています
また,「該当タスクは終わったのか」や「該当タスクはどれくらい進んでいるか」等の情報を
「shared_jobs」を読み込むことで確認し,対応したレスポンスを返答しています
3.課題
AudioQueryの設定はinputfiledではなくSliderを使ってもいいかも
エージェントをタッチした時との音声はStaticなものなので整合性がなくなる
サーバ処理でグローバル変数を使うのはあまりよろしくない
ChatGPTの返答生成部分は,音声生成とは別のスクリプトで管理した方がいい