# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. =========== # Licensed under the Apache License, Version 2.0 (the “License”); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an “AS IS” BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. =========== from abc import ABC, abstractmethod from typing import Any, Dict import openai import tiktoken from camel.typing import ModelType from chatdev.statistics import prompt_cost from chatdev.utils import log_visualize try: from openai.types.chat import ChatCompletion openai_new_api = True # new openai api version except ImportError: openai_new_api = False # old openai api version import os OPENAI_API_KEY = os.environ['OPENAI_API_KEY'] if 'BASE_URL' in os.environ: BASE_URL = os.environ['BASE_URL'] else: BASE_URL = None class ModelBackend(ABC): r"""Base class for different model backends. May be OpenAI API, a local LLM, a stub for unit tests, etc.""" @abstractmethod def run(self, *args, **kwargs): r"""Runs the query to the backend model. Raises: RuntimeError: if the return value from OpenAI API is not a dict that is expected. Returns: Dict[str, Any]: All backends must return a dict in OpenAI format. """ pass class OpenAIModel(ModelBackend): r"""OpenAI API in a unified ModelBackend interface.""" def __init__(self, model_type: ModelType, model_config_dict: Dict) -> None: super().__init__() self.model_type = model_type self.model_config_dict = model_config_dict def run(self, *args, **kwargs): string = "\n".join([message["content"] for message in kwargs["messages"]]) encoding = tiktoken.encoding_for_model(self.model_type.value) num_prompt_tokens = len(encoding.encode(string)) gap_between_send_receive = 15 * len(kwargs["messages"]) num_prompt_tokens += gap_between_send_receive if openai_new_api: # Experimental, add base_url if BASE_URL: client = openai.OpenAI( api_key=OPENAI_API_KEY, base_url=BASE_URL, ) else: client = openai.OpenAI( api_key=OPENAI_API_KEY ) num_max_token_map = { "gpt-3.5-turbo": 4096, "gpt-3.5-turbo-16k": 16384, "gpt-3.5-turbo-0613": 4096, "gpt-3.5-turbo-16k-0613": 16384, "gpt-4": 8192, "gpt-4-0613": 8192, "gpt-4-32k": 32768, "gpt-4-1106-preview": 4096, "gpt-4-1106-vision-preview": 4096, } num_max_token = num_max_token_map[self.model_type.value] num_max_completion_tokens = num_max_token - num_prompt_tokens self.model_config_dict['max_tokens'] = num_max_completion_tokens response = client.chat.completions.create(*args, **kwargs, model=self.model_type.value, **self.model_config_dict) cost = prompt_cost( self.model_type.value, num_prompt_tokens=response.usage.prompt_tokens, num_completion_tokens=response.usage.completion_tokens ) log_visualize( "**[OpenAI_Usage_Info Receive]**\nprompt_tokens: {}\ncompletion_tokens: {}\ntotal_tokens: {}\ncost: ${:.6f}\n".format( response.usage.prompt_tokens, response.usage.completion_tokens, response.usage.total_tokens, cost)) if not isinstance(response, ChatCompletion): raise RuntimeError("Unexpected return from OpenAI API") return response else: num_max_token_map = { "gpt-3.5-turbo": 4096, "gpt-3.5-turbo-16k": 16384, "gpt-3.5-turbo-0613": 4096, "gpt-3.5-turbo-16k-0613": 16384, "gpt-4": 8192, "gpt-4-0613": 8192, "gpt-4-32k": 32768, } num_max_token = num_max_token_map[self.model_type.value] num_max_completion_tokens = num_max_token - num_prompt_tokens self.model_config_dict['max_tokens'] = num_max_completion_tokens response = openai.ChatCompletion.create(*args, **kwargs, model=self.model_type.value, **self.model_config_dict) cost = prompt_cost( self.model_type.value, num_prompt_tokens=response["usage"]["prompt_tokens"], num_completion_tokens=response["usage"]["completion_tokens"] ) log_visualize( "**[OpenAI_Usage_Info Receive]**\nprompt_tokens: {}\ncompletion_tokens: {}\ntotal_tokens: {}\ncost: ${:.6f}\n".format( response["usage"]["prompt_tokens"], response["usage"]["completion_tokens"], response["usage"]["total_tokens"], cost)) if not isinstance(response, Dict): raise RuntimeError("Unexpected return from OpenAI API") return response class StubModel(ModelBackend): r"""A dummy model used for unit tests.""" def __init__(self, *args, **kwargs) -> None: super().__init__() def run(self, *args, **kwargs) -> Dict[str, Any]: ARBITRARY_STRING = "Lorem Ipsum" return dict( id="stub_model_id", usage=dict(), choices=[ dict(finish_reason="stop", message=dict(content=ARBITRARY_STRING, role="assistant")) ], ) class ModelFactory: r"""Factory of backend models. Raises: ValueError: in case the provided model type is unknown. """ @staticmethod def create(model_type: ModelType, model_config_dict: Dict) -> ModelBackend: default_model_type = ModelType.GPT_3_5_TURBO if model_type in { ModelType.GPT_3_5_TURBO, ModelType.GPT_3_5_TURBO_NEW, ModelType.GPT_4, ModelType.GPT_4_32k, ModelType.GPT_4_TURBO, ModelType.GPT_4_TURBO_V, None }: model_class = OpenAIModel elif model_type == ModelType.STUB: model_class = StubModel else: raise ValueError("Unknown model") if model_type is None: model_type = default_model_type # log_visualize("Model Type: {}".format(model_type)) inst = model_class(model_type, model_config_dict) return inst