Home NewsX From Zero to Hero: Building Your First Voice Bot with GPT-4o Real-Time API using Python

From Zero to Hero: Building Your First Voice Bot with GPT-4o Real-Time API using Python

by info.odysseyx@gmail.com
0 comment 8 views


Voice technology is changing the way we interact with machines, making conversations with AI feel more natural than ever. With the public beta release of the GPT-4o-based Realtime API, developers now have the tools to create low-latency, multi-modal voice experiences in their apps, opening up endless possibilities for innovation.

Gone are the days when building a voice bot required stringing together multiple models for transcription, inference, and text-to-speech. With the Realtime API, developers can now achieve flexible and natural voice-to-voice conversations by streamlining the entire process with a single API call. This will be a game-changer for industries such as customer support, training, and real-time language translation, where fast and seamless interactions are critical.

In this blog, we will walk you through building your first real-time voice bot from scratch using the GPT-4o real-time model. It covers key features of the Realtime API, how to set up a WebSocket connection for voice streaming, and how to leverage API features to handle interruptions and make function calls. Ultimately, you’re ready to create a voice bot that responds to users with near-human accuracy and emotion. Whether you’re a beginner or an experienced developer, this blueprint will help you create immersive voice interactions that are responsive and engaging. Are you ready to dive? Let’s get started!

Key Features

  • Low-latency streaming: Supports real-time audio input and output to promote natural and smooth conversations.
  • Multi-mode support: It handles both text and audio input and output, allowing for a variety of interaction modes.
  • Preset Voices: Supports six predefined voices to ensure quality and consistency of responses.
  • Function call: Enable voice assistants to perform tasks or dynamically retrieve contextual information.
  • Safety and Privacy: It incorporates multiple layers of safety protections, including automated monitoring and privacy policy compliance.

How GPT-4o Real-Time API Works

Traditionally, building a voice assistant required stringing together multiple models: an automatic speech recognition (ASR) model like Whisper for audio transcription, a text-based model for processing responses, and a text-to-speech (TTS) model for generating audio output. . . This multi-step process often resulted in delays and loss of emotional nuance.

The GPT-4o Realtime API revolutionizes this by consolidating these capabilities into a single API call. By establishing a persistent WebSocket connection, developers can stream audio input and output directly, significantly reducing latency and improving the naturalness of conversations. Additionally, the API’s ability to call functions allows voice bots to perform tasks such as placing an order or retrieving customer information instantly.

Building a real-time voice bot

Let’s walk through the step-by-step process of building your own real-time voice bot using the GPT-4o Realtime API.

prerequisites

Before you start, make sure you have the following:

  • Azure subscription: Make one for free.
  • Azure OpenAI resources: Set up in a supported region (US East 2 or Sweden Central).
  • Development environment: Knowledge of Python and basic asynchronous programming.
  • Client library: Tools like LiveKit, Agora, or Twilio can enhance the functionality of your bot.

API settings

  1. GPT-4o real-time model deployment:
  • Go to Azure AI Studio.
  • Access the model catalog and search for gpt-4o-realtime-preview.
  • Deploy your model by selecting an Azure OpenAI resource and configuring deployment settings.
  • Audio input and output configuration:
    • The API supports various audio formats, mainly pcm16.
    • Configure your client to handle audio streaming to ensure compatibility with API requirements.

    teaHis project demonstrates how to build sophisticated, real-time, conversational AI systems using Azure OpenAI. By leveraging WebSocket connections and an event-driven architecture, the system provides responsive, context-aware customer support in any language. This approach can be adapted to a variety of languages ​​and use cases, making it a versatile solution for businesses looking to improve their customer service capabilities. This project consists of three main components:

    • Real-time API: Handles WebSocket connections to Azure OpenAI’s real-time API.
    • Tools: Define various customer support functions, such as checking order status, processing returns, etc.
    • Application: Manages interaction flows and integrates real-time clients with the UI layer.

    Preferences

    Create an .env file and update the following environment variables:

    AZURE_OPENAI_API_KEY=XXXX
    # replace with your Azure OpenAI API Key
    
    AZURE_OPENAI_ENDPOINT=https://xxxx.openai.azure.com/
    # replace with your Azure OpenAI Endpoint
    
    AZURE_OPENAI_DEPLOYMENT=gpt-4o-realtime-preview
    #Create a deployment for the gpt-4o-realtime-preview model and place the deployment name here. You can name the deployment as per your choice and put the name here.
    
    AZURE_OPENAI_CHAT_DEPLOYMENT_VERSION=2024-10-01-preview
    #You don't need to change this unless you are willing to try other versions.

    Requirements.txt

    chainlit==1.3.0rc1
    openai
    beautifulsoup4
    lxml
    python-dotenv
    websockets
    aiohttp

    Realtime client implementation

    The heart of the voice bot is the Realtime client, which manages WebSocket connections and handles communication with the GPT-4o Realtime API. that Real-time API The class is responsible for managing WebSocket connections to OpenAI’s real-time API. I will handle it compensation This includes receiving messages, delivering events, and staying connected.

    Main components:

    • RealtimeAPI class:
      • Establishes and maintains a WebSocket connection.
      • Handles sending and receiving messages.
      • Manages event dispatch for various conversation events.

    class RealtimeAPI(RealtimeEventHandler):
        def __init__(self):
            super().__init__()
            self.default_url="wss://api.openai.com/v1/realtime"
            self.url = os.environ["AZURE_OPENAI_ENDPOINT"]
            self.api_key = os.environ["AZURE_OPENAI_API_KEY"]
            self.api_version = "2024-10-01-preview"
            self.azure_deployment = os.environ["AZURE_OPENAI_DEPLOYMENT"]
            self.ws = None
    
        def is_connected(self):
            return self.ws is not None
    
        def log(self, *args):
            logger.debug(f"[Websocket/{datetime.utcnow().isoformat()}]", *args)
    
        async def connect(self, model="gpt-4o-realtime-preview"):
            if self.is_connected():
                raise Exception("Already connected")
            self.ws = await websockets.connect(f"{self.url}/openai/realtime?api-version={self.api_version}&deployment={model}&api-key={self.api_key}", extra_headers={
                'Authorization': f'Bearer {self.api_key}',
                'OpenAI-Beta': 'realtime=v1'
            })
            self.log(f"Connected to {self.url}")
            asyncio.create_task(self._receive_messages())
    
        async def _receive_messages(self):
            async for message in self.ws:
                event = json.loads(message)
                if event['type'] == "error":
                    logger.error("ERROR", message)
                self.log("received:", event)
                self.dispatch(f"server.{event['type']}", event)
                self.dispatch("server.*", event)
    
        async def send(self, event_name, data=None):
            if not self.is_connected():
                raise Exception("RealtimeAPI is not connected")
            data = data or {}
            if not isinstance(data, dict):
                raise Exception("data must be a dictionary")
            event = {
                "event_id": self._generate_id("evt_"),
                "type": event_name,
                **data
            }
            self.dispatch(f"client.{event_name}", event)
            self.dispatch("client.*", event)
            self.log("sent:", event)
            await self.ws.send(json.dumps(event))
    
        def _generate_id(self, prefix):
            return f"{prefix}{int(datetime.utcnow().timestamp() * 1000)}"
    
        async def disconnect(self):
            if self.ws:
                await self.ws.close()
                self.ws = None
                self.log(f"Disconnected from {self.url}")

    reference: init.py

    • Live conversation classes:
      • Manage conversation state.
      • It handles different types of events, including message creation, recording completion, and audio streaming.
      • Queue and format audio and text data for seamless interaction.

    class RealtimeConversation:
        default_frequency = config.features.audio.sample_rate
        
        EventProcessors = {
            'conversation.item.created': lambda self, event: self._process_item_created(event),
            'conversation.item.truncated': lambda self, event: self._process_item_truncated(event),
            'conversation.item.deleted': lambda self, event: self._process_item_deleted(event),
            'conversation.item.input_audio_transcription.completed': lambda self, event: self._process_input_audio_transcription_completed(event),
            'input_audio_buffer.speech_started': lambda self, event: self._process_speech_started(event),
            'input_audio_buffer.speech_stopped': lambda self, event, input_audio_buffer: self._process_speech_stopped(event, input_audio_buffer),
            'response.created': lambda self, event: self._process_response_created(event),
            'response.output_item.added': lambda self, event: self._process_output_item_added(event),
            'response.output_item.done': lambda self, event: self._process_output_item_done(event),
            'response.content_part.added': lambda self, event: self._process_content_part_added(event),
            'response.audio_transcript.delta': lambda self, event: self._process_audio_transcript_delta(event),
            'response.audio.delta': lambda self, event: self._process_audio_delta(event),
            'response.text.delta': lambda self, event: self._process_text_delta(event),
            'response.function_call_arguments.delta': lambda self, event: self._process_function_call_arguments_delta(event),
        }
        
        def __init__(self):
            self.clear()
    
        def clear(self):
            self.item_lookup = {}
            self.items = []
            self.response_lookup = {}
            self.responses = []
            self.queued_speech_items = {}
            self.queued_transcript_items = {}
            self.queued_input_audio = None
    
        def queue_input_audio(self, input_audio):
            self.queued_input_audio = input_audio
    
        def process_event(self, event, *args):
            event_processor = self.EventProcessors.get(event['type'])
            if not event_processor:
                raise Exception(f"Missing conversation event processor for {event['type']}")
            return event_processor(self, event, *args)
    
        def get_item(self, id):
            return self.item_lookup.get(id)
    
        def get_items(self):
            return self.items[:]
    
        def _process_item_created(self, event):
            item = event['item']
            new_item = item.copy()
            if new_item['id'] not in self.item_lookup:
                self.item_lookup[new_item['id']] = new_item
                self.items.append(new_item)
            new_item['formatted'] = {
                'audio': [],
                'text': '',
                'transcript': ''
            }
            if new_item['id'] in self.queued_speech_items:
                new_item['formatted']['audio'] = self.queued_speech_items[new_item['id']]['audio']
                del self.queued_speech_items[new_item['id']]
            if 'content' in new_item:
                text_content = [c for c in new_item['content'] if c['type'] in ['text', 'input_text']]
                for content in text_content:
                    new_item['formatted']['text'] += content['text']
            if new_item['id'] in self.queued_transcript_items:
                new_item['formatted']['transcript'] = self.queued_transcript_items[new_item['id']]['transcript']
                del self.queued_transcript_items[new_item['id']]
            if new_item['type'] == 'message':
                if new_item['role'] == 'user':
                    new_item['status'] = 'completed'
                    if self.queued_input_audio:
                        new_item['formatted']['audio'] = self.queued_input_audio
                        self.queued_input_audio = None
                else:
                    new_item['status'] = 'in_progress'
            elif new_item['type'] == 'function_call':
                new_item['formatted']['tool'] = {
                    'type': 'function',
                    'name': new_item['name'],
                    'call_id': new_item['call_id'],
                    'arguments': ''
                }
                new_item['status'] = 'in_progress'
            elif new_item['type'] == 'function_call_output':
                new_item['status'] = 'completed'
                new_item['formatted']['output'] = new_item['output']
            return new_item, None
    
        def _process_item_truncated(self, event):
            item_id = event['item_id']
            audio_end_ms = event['audio_end_ms']
            item = self.item_lookup.get(item_id)
            if not item:
                raise Exception(f'item.truncated: Item "{item_id}" not found')
            end_index = (audio_end_ms * self.default_frequency) // 1000
            item['formatted']['transcript'] = ''
            item['formatted']['audio'] = item['formatted']['audio'][:end_index]
            return item, None
    
        def _process_item_deleted(self, event):
            item_id = event['item_id']
            item = self.item_lookup.get(item_id)
            if not item:
                raise Exception(f'item.deleted: Item "{item_id}" not found')
            del self.item_lookup[item['id']]
            self.items.remove(item)
            return item, None
    
        def _process_input_audio_transcription_completed(self, event):
            item_id = event['item_id']
            content_index = event['content_index']
            transcript = event['transcript']
            formatted_transcript = transcript or ' '
            item = self.item_lookup.get(item_id)
            if not item:
                self.queued_transcript_items[item_id] = {'transcript': formatted_transcript}
                return None, None
            item['content'][content_index]['transcript'] = transcript
            item['formatted']['transcript'] = formatted_transcript
            return item, {'transcript': transcript}
    
        def _process_speech_started(self, event):
            item_id = event['item_id']
            audio_start_ms = event['audio_start_ms']
            self.queued_speech_items[item_id] = {'audio_start_ms': audio_start_ms}
            return None, None
    
        def _process_speech_stopped(self, event, input_audio_buffer):
            item_id = event['item_id']
            audio_end_ms = event['audio_end_ms']
            speech = self.queued_speech_items[item_id]
            speech['audio_end_ms'] = audio_end_ms
            if input_audio_buffer:
                start_index = (speech['audio_start_ms'] * self.default_frequency) // 1000
                end_index = (speech['audio_end_ms'] * self.default_frequency) // 1000
                speech['audio'] = input_audio_buffer[start_index:end_index]
            return None, None
    
        def _process_response_created(self, event):
            response = event['response']
            if response['id'] not in self.response_lookup:
                self.response_lookup[response['id']] = response
                self.responses.append(response)
            return None, None
    
        def _process_output_item_added(self, event):
            response_id = event['response_id']
            item = event['item']
            response = self.response_lookup.get(response_id)
            if not response:
                raise Exception(f'response.output_item.added: Response "{response_id}" not found')
            response['output'].append(item['id'])
            return None, None
    
        def _process_output_item_done(self, event):
            item = event['item']
            if not item:
                raise Exception('response.output_item.done: Missing "item"')
            found_item = self.item_lookup.get(item['id'])
            if not found_item:
                raise Exception(f'response.output_item.done: Item "{item["id"]}" not found')
            found_item['status'] = item['status']
            return found_item, None
    
        def _process_content_part_added(self, event):
            item_id = event['item_id']
            part = event['part']
            item = self.item_lookup.get(item_id)
            if not item:
                raise Exception(f'response.content_part.added: Item "{item_id}" not found')
            item['content'].append(part)
            return item, None
    
        def _process_audio_transcript_delta(self, event):
            item_id = event['item_id']
            content_index = event['content_index']
            delta = event['delta']
            item = self.item_lookup.get(item_id)
            if not item:
                raise Exception(f'response.audio_transcript.delta: Item "{item_id}" not found')
            item['content'][content_index]['transcript'] += delta
            item['formatted']['transcript'] += delta
            return item, {'transcript': delta}
    
        def _process_audio_delta(self, event):
            item_id = event['item_id']
            content_index = event['content_index']
            delta = event['delta']
            item = self.item_lookup.get(item_id)
            if not item:
                logger.debug(f'response.audio.delta: Item "{item_id}" not found')
                return None, None
            array_buffer = base64_to_array_buffer(delta)
            append_values = array_buffer.tobytes()
            # TODO: make it work
            # item['formatted']['audio'] = merge_int16_arrays(item['formatted']['audio'], append_values)
            return item, {'audio': append_values}
    
        def _process_text_delta(self, event):
            item_id = event['item_id']
            content_index = event['content_index']
            delta = event['delta']
            item = self.item_lookup.get(item_id)
            if not item:
                raise Exception(f'response.text.delta: Item "{item_id}" not found')
            item['content'][content_index]['text'] += delta
            item['formatted']['text'] += delta
            return item, {'text': delta}
    
        def _process_function_call_arguments_delta(self, event):
            item_id = event['item_id']
            delta = event['delta']
            item = self.item_lookup.get(item_id)
            if not item:
                raise Exception(f'response.function_call_arguments.delta: Item "{item_id}" not found')
            item['arguments'] += delta
            item['formatted']['tool']['arguments'] += delta
            return item, {'arguments': delta}

    • RealtimeClient class:
      • Initialization: Sets system prompts, session configuration, and initializes RealtimeAPI and RealtimeConversation to manage WebSocket connection and conversation events.
      • Connection management: Handles connecting and disconnecting from servers, waiting for session creation, and updating session settings.
      • Event processing: Receives server and client events, processes them, and passes them on to the appropriate handler.
        Conversation management: Manages the creation, update, and deletion of conversation items, including handling input audio and voice events.
      • Tool and response management: Supports adding/removing tools, event-based calls, sending user messages, generating responses, and managing audio content.

    class RealtimeClient(RealtimeEventHandler):
        def __init__(self, system_prompt: str):
            super().__init__()
            self.system_prompt = system_prompt
            self.default_session_config = {
                "modalities": ["text", "audio"],
                "instructions": self.system_prompt,
                "voice": "shimmer",
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "input_audio_transcription": { "model": 'whisper-1' },
                "turn_detection": { "type": 'server_vad' },
                "tools": [],
                "tool_choice": "auto",
                "temperature": 0.8,
                "max_response_output_tokens": 4096,
            }
            self.session_config = {}
            self.transcription_models = [{"model": "whisper-1"}]
            self.default_server_vad_config = {
                "type": "server_vad",
                "threshold": 0.5,
                "prefix_padding_ms": 300,
                "silence_duration_ms": 200,
            }
            self.realtime = RealtimeAPI()
            self.conversation = RealtimeConversation()
            self._reset_config()
            self._add_api_event_handlers()
            
        def _reset_config(self):
            self.session_created = False
            self.tools = {}
            self.session_config = self.default_session_config.copy()
            self.input_audio_buffer = bytearray()
            return True
    
        def _add_api_event_handlers(self):
            self.realtime.on("client.*", self._log_event)
            self.realtime.on("server.*", self._log_event)
            self.realtime.on("server.session.created", self._on_session_created)
            self.realtime.on("server.response.created", self._process_event)
            self.realtime.on("server.response.output_item.added", self._process_event)
            self.realtime.on("server.response.content_part.added", self._process_event)
            self.realtime.on("server.input_audio_buffer.speech_started", self._on_speech_started)
            self.realtime.on("server.input_audio_buffer.speech_stopped", self._on_speech_stopped)
            self.realtime.on("server.conversation.item.created", self._on_item_created)
            self.realtime.on("server.conversation.item.truncated", self._process_event)
            self.realtime.on("server.conversation.item.deleted", self._process_event)
            self.realtime.on("server.conversation.item.input_audio_transcription.completed", self._process_event)
            self.realtime.on("server.response.audio_transcript.delta", self._process_event)
            self.realtime.on("server.response.audio.delta", self._process_event)
            self.realtime.on("server.response.text.delta", self._process_event)
            self.realtime.on("server.response.function_call_arguments.delta", self._process_event)
            self.realtime.on("server.response.output_item.done", self._on_output_item_done)
    
        def _log_event(self, event):
            realtime_event = {
                "time": datetime.utcnow().isoformat(),
                "source": "client" if event["type"].startswith("client.") else "server",
                "event": event,
            }
            self.dispatch("realtime.event", realtime_event)
    
        def _on_session_created(self, event):
            self.session_created = True
    
        def _process_event(self, event, *args):
            item, delta = self.conversation.process_event(event, *args)
            if item:
                self.dispatch("conversation.updated", {"item": item, "delta": delta})
            return item, delta
    
        def _on_speech_started(self, event):
            self._process_event(event)
            self.dispatch("conversation.interrupted", event)
    
        def _on_speech_stopped(self, event):
            self._process_event(event, self.input_audio_buffer)
    
        def _on_item_created(self, event):
            item, delta = self._process_event(event)
            self.dispatch("conversation.item.appended", {"item": item})
            if item and item["status"] == "completed":
                self.dispatch("conversation.item.completed", {"item": item})
    
        async def _on_output_item_done(self, event):
            item, delta = self._process_event(event)
            if item and item["status"] == "completed":
                self.dispatch("conversation.item.completed", {"item": item})
            if item and item.get("formatted", {}).get("tool"):
                await self._call_tool(item["formatted"]["tool"])
    
        async def _call_tool(self, tool):
            try:
                print(tool["arguments"])
                json_arguments = json.loads(tool["arguments"])
                tool_config = self.tools.get(tool["name"])
                if not tool_config:
                    raise Exception(f'Tool "{tool["name"]}" has not been added')
                result = await tool_config["handler"](**json_arguments)
                await self.realtime.send("conversation.item.create", {
                    "item": {
                        "type": "function_call_output",
                        "call_id": tool["call_id"],
                        "output": json.dumps(result),
                    }
                })
            except Exception as e:
                logger.error(traceback.format_exc())
                await self.realtime.send("conversation.item.create", {
                    "item": {
                        "type": "function_call_output",
                        "call_id": tool["call_id"],
                        "output": json.dumps({"error": str(e)}),
                    }
                })
            await self.create_response()
    
        def is_connected(self):
            return self.realtime.is_connected()
    
        def reset(self):
            self.disconnect()
            self.realtime.clear_event_handlers()
            self._reset_config()
            self._add_api_event_handlers()
            return True
    
        async def connect(self):
            if self.is_connected():
                raise Exception("Already connected, use .disconnect() first")
            await self.realtime.connect()
            await self.update_session()
            return True
    
        async def wait_for_session_created(self):
            if not self.is_connected():
                raise Exception("Not connected, use .connect() first")
            while not self.session_created:
                await asyncio.sleep(0.001)
            return True
    
        async def disconnect(self):
            self.session_created = False
            self.conversation.clear()
            if self.realtime.is_connected():
                await self.realtime.disconnect()
    
        def get_turn_detection_type(self):
            return self.session_config.get("turn_detection", {}).get("type")
    
        async def add_tool(self, definition, handler):
            if not definition.get("name"):
                raise Exception("Missing tool name in definition")
            name = definition["name"]
            if name in self.tools:
                raise Exception(f'Tool "{name}" already added. Please use .removeTool("{name}") before trying to add again.')
            if not callable(handler):
                raise Exception(f'Tool "{name}" handler must be a function')
            self.tools[name] = {"definition": definition, "handler": handler}
            await self.update_session()
            return self.tools[name]
    
        def remove_tool(self, name):
            if name not in self.tools:
                raise Exception(f'Tool "{name}" does not exist, can not be removed.')
            del self.tools[name]
            return True
    
        async def delete_item(self, id):
            await self.realtime.send("conversation.item.delete", {"item_id": id})
            return True
    
        async def update_session(self, **kwargs):
            self.session_config.update(kwargs)
            use_tools = [
                {**tool_definition, "type": "function"}
                for tool_definition in self.session_config.get("tools", [])
            ] + [
                {**self.tools[key]["definition"], "type": "function"}
                for key in self.tools
            ]
            session = {**self.session_config, "tools": use_tools}
            if self.realtime.is_connected():
                await self.realtime.send("session.update", {"session": session})
            return True
        
        async def create_conversation_item(self, item):
            await self.realtime.send("conversation.item.create", {
                "item": item
            })
    
        async def send_user_message_content(self, content=[]):
            if content:
                for c in content:
                    if c["type"] == "input_audio":
                        if isinstance(c["audio"], (bytes, bytearray)):
                            c["audio"] = array_buffer_to_base64(c["audio"])
                await self.realtime.send("conversation.item.create", {
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": content,
                    }
                })
            await self.create_response()
            return True
    
        async def append_input_audio(self, array_buffer):
            if len(array_buffer) > 0:
                await self.realtime.send("input_audio_buffer.append", {
                    "audio": array_buffer_to_base64(np.array(array_buffer)),
                })
                self.input_audio_buffer.extend(array_buffer)
            return True
    
        async def create_response(self):
            if self.get_turn_detection_type() is None and len(self.input_audio_buffer) > 0:
                await self.realtime.send("input_audio_buffer.commit")
                self.conversation.queue_input_audio(self.input_audio_buffer)
                self.input_audio_buffer = bytearray()
            await self.realtime.send("response.create")
            return True
    
        async def cancel_response(self, id=None, sample_count=0):
            if not id:
                await self.realtime.send("response.cancel")
                return {"item": None}
            else:
                item = self.conversation.get_item(id)
                if not item:
                    raise Exception(f'Could not find item "{id}"')
                if item["type"] != "message":
                    raise Exception('Can only cancelResponse messages with type "message"')
                if item["role"] != "assistant":
                    raise Exception('Can only cancelResponse messages with role "assistant"')
                await self.realtime.send("response.cancel")
                audio_index = next((i for i, c in enumerate(item["content"]) if c["type"] == "audio"), -1)
                if audio_index == -1:
                    raise Exception("Could not find audio on item to cancel")
                await self.realtime.send("conversation.item.truncate", {
                    "item_id": id,
                    "content_index": audio_index,
                    "audio_end_ms": int((sample_count / self.conversation.default_frequency) * 1000),
                })
                return {"item": item}
    
        async def wait_for_next_item(self):
            event = await self.wait_for_next("conversation.item.appended")
            return {"item": event["item"]}
    
        async def wait_for_next_completed_item(self):
            event = await self.wait_for_next("conversation.item.completed")
            return {"item": event["item"]}

    Add tools and handlers

    You can extend the functionality of your voice bot by integrating various tools and handlers. This allows the bot to perform specific actions based on user input.

    1. Tool Definition Definition:
    • In tool.py, you define the functionality of your bot, such as checking order status, processing returns, and updating account information.
    • Each tool includes a name, description, and required parameters.
  • Handler implementation:
    • Create an asynchronous handler function for each tool to execute the desired operation.
    • These handlers interact with backend systems or databases to fulfill user requests.
  • Tool integration with Realtime Client:
    • Register each tool and its handler with RealtimeClient in the app.py file.
    • Make sure your bot can call these tools dynamically during the conversation.

    Main components:

    • Tool definition:
      • A structured description of each tool, including required parameters and features.

    yes:

    # Function Definitions
    check_order_status_def = {
        "name": "check_order_status",
        "description": "Check the status of a customer's order",
        "parameters": {
          "type": "object",
          "properties": {
            "customer_id": {
              "type": "string",
              "description": "The unique identifier for the customer"
            },
            "order_id": {
              "type": "string",
              "description": "The unique identifier for the order"
            }
          },
          "required": ["customer_id", "order_id"]
        }
    }

    • Handler function:
      • An asynchronous function that executes the logic for each tool.
      • Interact with external systems, databases, or perform specific tasks upon user request.

    yes:

    async def check_order_status_handler(customer_id, order_id):
        status = "In Transit"
        
        # Your Business Logic
        estimated_delivery, status, order_date =  fetch_order_details(order_id, customer_id)
        # Read the HTML template
        with open('order_status_template.html', 'r') as file:
            html_content = file.read()
    
        # Replace placeholders with actual data
        html_content = html_content.format(
            order_id=order_id,
            customer_id=customer_id,
            order_date=order_date.strftime("%B %d, %Y"),
            estimated_delivery=estimated_delivery.strftime("%B %d, %Y"),
            status=status
        )
    
        # Return the Chainlit message with HTML content
        await cl.Message(content=f"Here is the detail of your order \n {html_content}").send()
        return f"Order {order_id} status for customer {customer_id}: {status}"
      

    reference:

    Integration with applications

    Now that you have the Realtime Client and tools in place, it’s time to integrate everything into your application.

    1. OpenAI real-time initialization:
    • Establish a connection to the GPT-4o Realtime API using a system prompt and session configuration in app.py.
    • Manage user sessions and seamlessly track interactions.
  • Handling user interaction:
    • Implement event handlers for starting a chat, receiving messages, processing audio, and ending the session.
    • Ensure that user input, whether text or voice, is properly processed and responded to in real time.
  • Conversation flow management:
    • We leverage the RealtimeConversation class to handle conversation state, manage audio streams, and maintain context.
    • Implement logic to handle abort, cancellation, and dynamic responses based on user actions.

    Main components:

    • reset:
      • Set up OpenAI Realtime Client and configure tools with system prompts.

    system_prompt = """Provide helpful and empathetic support responses to customer inquiries for ShopMe in Hindi language, addressing their requests, concerns, or feedback professionally.
    
    Maintain a friendly and service-oriented tone throughout the interaction to ensure a positive customer experience.
    
    # Steps
    
    1. **Identify the Issue:** Carefully read the customer's inquiry to understand the problem or question they are presenting.
    2. **Gather Relevant Information:** Check for any additional data needed, such as order numbers or account details, while ensuring the privacy and security of the customer's information.
    3. **Formulate a Response:** Develop a solution or informative response based on the understanding of the issue. The response should be clear, concise, and address all parts of the customer's concern.
    4. **Offer Further Assistance:** Invite the customer to reach out again if they need more help or have additional questions.
    5. **Close Politely:** End the conversation with a polite closing statement that reinforces the service commitment of ShopMe.
    
    # Output Format
    
    Provide a clear and concise paragraph addressing the customer's inquiry, including:
    - Acknowledgment of their concern
    - Suggested solution or response
    - Offer for further assistance
    - Polite closing
    
    # Notes
    - Greet user with Welcome to ShopMe For the first time only
    - always speak in Hindi
    - Ensure all customer data is handled according to relevant privacy and data protection laws and ShopMe's privacy policy.
    - In cases of high sensitivity or complexity, escalate the issue to a human customer support agent.
    - Keep responses within a reasonable length to ensure they are easy to read and understand."""

    • Event handler:
      • Manages chat start, message reception, audio streaming, and session end events.

    First, let’s initialize the real-time client we discussed earlier.

    async def setup_openai_realtime(system_prompt: str):
        """Instantiate and configure the OpenAI Realtime Client"""
        openai_realtime = RealtimeClient(system_prompt = system_prompt)
        cl.user_session.set("track_id", str(uuid4()))
        async def handle_conversation_updated(event):
            item = event.get("item")
            delta = event.get("delta")
            """Currently used to stream audio back to the client."""
            if delta:
                # Only one of the following will be populated for any given event
                if 'audio' in delta:
                    audio = delta['audio']  # Int16Array, audio added
                    await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
                if 'transcript' in delta:
                    transcript = delta['transcript']  # string, transcript added
                    pass
                if 'arguments' in delta:
                    arguments = delta['arguments']  # string, function arguments added
                    pass
                
        async def handle_item_completed(item):
            """Used to populate the chat context with transcription once an item is completed."""
            # print(item) # TODO
            pass
        
        async def handle_conversation_interrupt(event):
            """Used to cancel the client previous audio playback."""
            cl.user_session.set("track_id", str(uuid4()))
            await cl.context.emitter.send_audio_interrupt()
            
        async def handle_error(event):
            logger.error(event)

    • Session Management:
      • It maintains user sessions, handles conversation interruptions, and ensures a smooth flow of interactions. As you can see in the code below, the idea is that whenever I receive an audio chunk, I should call the real-time client with the audio chunk.

    if openai_realtime:            
        if openai_realtime.is_connected():
            await openai_realtime.append_input_audio(chunk.data)
        else:
            logger.info("RealtimeClient is not connected")

    reference: app.py

    Testing and Deployment

    Once a voice bot is built, thorough testing is essential to ensure reliability and user satisfaction.

    1. Local testing:
    • Interact with your deployed models using the AI ​​Studio real-time audio playground.
    • Test a variety of features including speech recognition, response generation, tool execution, and more.
  • Integration testing:
    • Make sure your application communicates well with the Realtime API.
    • Test your event handlers and tool integrations to ensure correct behavior in various scenarios.
  • deployment:
    • Deploy applications to production environments by leveraging cloud services for scalability.
    • Monitor performance and adjust as needed to handle real-world usage.

    conclusion

    Building real-time voice bots is easier than ever thanks to the GPT-4o Realtime API. By consolidating voice-to-voice capabilities into a single, efficient interface, developers can create engaging, natural conversation experiences without the complexity of managing multiple models. Whether you’re enhancing customer support, developing training tools, or creating conversational applications, the GPT-4o Realtime API provides a powerful foundation to make your voice bot vision a reality.

    Start your development journey today and explore the endless possibilities that real-time voice interaction can offer your users!

    Please feel free to refer to it Azure OpenAI GPT-4o Real-Time API Documentation For more information about setup, deployment, and advanced configuration, see:

    thank you

    Manoranjan Rajguru

    https://www.linkedin.com/in/manoranjan-rajguru/





    Source link

    You may also like

    Leave a Comment

    Our Company

    Welcome to OdysseyX, your one-stop destination for the latest news and opportunities across various domains.

    Newsletter

    Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

    Laest News

    @2024 – All Right Reserved. Designed and Developed by OdysseyX