Skip to content
Snippets Groups Projects
Commit c48fc5eb authored by Nan Mu's avatar Nan Mu
Browse files

finished v1

parent 8874a207
No related branches found
No related tags found
No related merge requests found
...@@ -3,7 +3,8 @@ import webrtcvad ...@@ -3,7 +3,8 @@ import webrtcvad
import wave import wave
import uuid import uuid
import os import os
import openai
from openai import OpenAI
class AudioToTextWorker: class AudioToTextWorker:
...@@ -11,11 +12,12 @@ class AudioToTextWorker: ...@@ -11,11 +12,12 @@ class AudioToTextWorker:
self.channels = 1 self.channels = 1
self.frame_rate = 16000 # webrtcvad only support frame rate at 8000, 16000, 32000 or 48000 Hz. self.frame_rate = 16000 # webrtcvad only support frame rate at 8000, 16000, 32000 or 48000 Hz.
self.per_sample_duration = 0.03 # webrtcvad only support per sample duration at 10, 20, or 30 ms. self.per_sample_duration = 0.03 # webrtcvad only support per sample duration at 10, 20, or 30 ms.
self.max_non_speaking_seconds = 2 self.max_non_speaking_seconds = 3
self.filename = "" self.filename = ""
self.p = pyaudio.PyAudio() self.p = pyaudio.PyAudio()
self.vad = webrtcvad.Vad() self.vad = webrtcvad.Vad()
self.vad.set_mode(1) self.vad.set_mode(1)
self.client = OpenAI()
assert webrtcvad.valid_rate_and_frame_length(self.frame_rate, int(self.frame_rate * self.per_sample_duration)), "invalid frame_rate or per_sample_duration for webrtcvad" assert webrtcvad.valid_rate_and_frame_length(self.frame_rate, int(self.frame_rate * self.per_sample_duration)), "invalid frame_rate or per_sample_duration for webrtcvad"
...@@ -44,7 +46,7 @@ class AudioToTextWorker: ...@@ -44,7 +46,7 @@ class AudioToTextWorker:
stream.stop_stream() stream.stop_stream()
stream.close() stream.close()
self.p.terminate() #self.p.terminate()
self.filename = f'{str(uuid.uuid4())}.wav' self.filename = f'{str(uuid.uuid4())}.wav'
wf = wave.open(self.filename, 'wb') wf = wave.open(self.filename, 'wb')
...@@ -58,12 +60,12 @@ class AudioToTextWorker: ...@@ -58,12 +60,12 @@ class AudioToTextWorker:
assert os.path.exists(self.filename), f"Audio not exist, call record_audio function first." assert os.path.exists(self.filename), f"Audio not exist, call record_audio function first."
f = open(self.filename, "rb") f = open(self.filename, "rb")
transcript = openai.Audio.transcribe("whisper-1", f) transcript = self.client.audio.transcriptions.create(model="whisper-1", file=f)
if delete_audio_file: if delete_audio_file:
os.remove(self.filename) os.remove(self.filename)
self.filename = "" self.filename = ""
return transcript["text"] return transcript.text
if __name__ == '__main__': if __name__ == '__main__':
worker = AudioToTextWorker() worker = AudioToTextWorker()
......
import json
from typing import Union
from openai import OpenAI
class ChatApiClient:
def __init__(self, system_prompts: str, model: str = "gpt-4"):
self.system_prompts = system_prompts
self.model = model
self.client = OpenAI()
def chat(self, prompt: str) -> Union[dict, str]:
response = self.client.chat.completions.create(
model=self.model,
messages = [
{"role": "system", "content": self.system_prompts},
{"role": "user", "content": prompt},
]
)
text = response.choices[0].message.content
try:
return json.loads(text)
except json.decoder.JSONDecodeError:
return text
if __name__ == "__main__":
with open("./system_prompts.txt", "r") as file:
system_prompts = []
for line in file:
system_prompts.append(line)
system_prompts = "\n".join(system_prompts)
client = ChatApiClient(system_prompts)
print(client.chat("turn off office light"))
import os
import time
from audio_to_text import AudioToTextWorker
from chat_api_client import ChatApiClient
from rest_api_client import RestApiClient
class HomeAssistant:
def __init__(self) -> None:
self.read_environ()
self.read_system_prompts()
self.audio_to_text_worker = AudioToTextWorker()
self.chat_api_client = ChatApiClient(self.system_prompts)
self.rest_api_client = RestApiClient(self.home_assistant_host, self.home_assistant_token)
def read_environ(self):
self.home_assistant_host = os.environ["HOME_ASSISTANT_HOST"]
if not self.home_assistant_host:
raise KeyError("HOME_ASSISTANT_HOST not set")
self.home_assistant_token = os.environ["HOME_ASSISTANT_API_TOKEN"]
if not self.home_assistant_token:
raise KeyError("HOME_ASSISTANT_API_TOKEN not set")
if not os.environ["OPENAI_API_KEY"]:
raise KeyError("OPENAI_API_KEY not set")
def read_system_prompts(self, path: str = "./system_prompts.txt"):
with open("./system_prompts.txt", "r") as file:
system_prompts = []
for line in file:
system_prompts.append(line)
self.system_prompts = "\n".join(system_prompts)
def run(self):
while True:
self.audio_to_text_worker.record_audio()
prompt = self.audio_to_text_worker.audio_to_text()
print(f"User: {prompt}")
if "Bye-bye" in prompt or not prompt:
break
reply = self.chat_api_client.chat(prompt)
if isinstance(reply, str):
print(f"Assistant: {reply}")
continue
status = self.rest_api_client.request(reply)
if status:
print(f"Assistant: Task Done.")
else:
print(f"Assistant: Task Failed.")
time.sleep(2)
if __name__ == "__main__":
home_assistant = HomeAssistant()
home_assistant.run()
import requests
import os
from urllib.parse import urljoin
class RestApiClient:
def __init__(self, host: str, bearer_token: str) -> None:
self.host = host
self.headers = {"Authorization": f"Bearer {bearer_token}"}
def request(self, data: dict) -> bool:
url = urljoin(self.host, data["endpoint"])
r = requests.request(method=data["method"], url=url, data=data["body"], headers=self.headers)
return r.status_code == requests.codes.ok
if __name__ == "__main__":
data = {
"method": "POST",
"endpoint": "/api/services/homeassistant/turn_off",
"body": "{\"entity_id\": \"light.office_main_lights\"}"
}
host = os.environ["HOME_ASSISTANT_HOST"]
token = os.environ["HOME_ASSISTANT_API_TOKEN"]
client = RestApiClient(host, token)
print(client.request(data))
You can access home assistant system. You can access the API specs of the home assistant system. Your goal is to help user generate correct http request based on the API spec to help user achieve their task.
Here is the API spec:
Method: POST
Endpoint: /api/services/homeassistant/<service>
Available services are:
- turn_off
- turn_on
- toggle
- stop
- restart
The available devices entity ids are:
- light.living_room_main_lights_1
- lock.front_door
- light.master_bedroom_main_lights
- light.hallway_main_lights
- light.office_main_lights
Here are some examples:
When user ask "turn off master bedroom lights"
You should respond:
{
"method": "POST",
"endpoint": "/api/services/homeassistant/turn_off"
"body": "{\"entity_id\": \"light.master_bedroom_main_lights\"}"
}
When user ask "turn on hallway lights"
You should respond:
{
"method": "POST",
"endpoint": "/api/services/homeassistant/turn_on"
"body": "{\"entity_id\": \"light.hallway_main_lights\"}"
}
import os
import pygame
import uuid
from pathlib import Path
from openai import OpenAI
pygame.init()
class TextToAudioWorker:
def __init__(self, model: str = "tts-1", voice: str = "alloy") -> None:
self.client = OpenAI()
self.model = model
self.voice = voice
def text_to_audio(self, text):
speech_file_path = Path(__file__).parent / f'{str(uuid.uuid4())}.mp3'
response = self.client.audio.speech.create(
model=self.model,
voice=self.voice,
input=text
)
response.stream_to_file(speech_file_path)
return speech_file_path
def play_audio(self, speech_file_path: str, delete_audio_file: bool = True):
pygame.mixer.music.load(speech_file_path)
pygame.mixer.music.set_volume(1.0)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy() == True:
pass
if delete_audio_file:
os.remove(speech_file_path)
if __name__ == "__main__":
worker = TextToAudioWorker()
speech_file_path = worker.text_to_audio("hello world, I am your home assistant. How can I help you?")
worker.play_audio(speech_file_path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment