From 90e2e98f420b7b213ff16c1376a2905f10addb5f Mon Sep 17 00:00:00 2001 From: Schark Date: Tue, 5 Dec 2023 14:33:42 -0800 Subject: Moving API key location to 'key', file restructure --- .gitignore | 2 +- chat.py | 80 --------------------------- help.py | 178 ----------------------------------------------------------- src/chat.py | 80 +++++++++++++++++++++++++++ src/help.py | 178 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/style.py | 57 +++++++++++++++++++ style.py | 57 ------------------- 7 files changed, 316 insertions(+), 316 deletions(-) delete mode 100644 chat.py delete mode 100644 help.py create mode 100644 src/chat.py create mode 100644 src/help.py create mode 100644 src/style.py delete mode 100644 style.py diff --git a/.gitignore b/.gitignore index 59fe332..feba61d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ .venv/ __pycache__/ *.json -token +key diff --git a/chat.py b/chat.py deleted file mode 100644 index 1221d8e..0000000 --- a/chat.py +++ /dev/null @@ -1,80 +0,0 @@ -import queue -import sys -import os -import threading -import time - -import curses -from openai import OpenAI - -from help import HelpCommands, start_chat -from style import StyleLog - -# Read in token from "token" file -# TODO: env variable in future? -with open("token", "r") as file: - token = file.readlines() -client = OpenAI(api_key=token[0].strip()) - -def text_call(api_call_queue, messages, model): - response = client.chat.completions.create( - model=model, - messages=messages - ) - api_call_queue.put(response.choices[0].message.content) - -def image_call(api_call_queue, messages, model): - response = client.chat.completions.create( - model=model, - messages=messages - ) - api_call_queue.put(response.choices[0].message.content) - -def main(): - - model = "" - if len(sys.argv) > 1: - model = sys.argv[1] - else: - model = "gpt-3.5-turbo" - - helper = HelpCommands(model) - styler = StyleLog() - messages = start_chat(model, styler) - - while True: - - user_input = styler.prompt("user", "") - - status, messages, model = helper.command(user_input, messages, model, styler) - if status == 1: - break - elif status == 2: - continue - - global api_call_done - api_call_done = threading.Event() - api_call_queue = queue.Queue() - - if model == "dall-e-2" or model == "dall-e-3": - response_thread = threading.Thread(target=image_call, args=(api_call_queue, messages, model,)) - else: - response_thread = threading.Thread(target=text_call, args=(api_call_queue, messages, model,)) - response_thread.start() - - ellipsis_thread = threading.Thread(target=styler.show_ellipsis, args=(api_call_done,)) - ellipsis_thread.start() - - response_thread.join() - api_call_done.set() - ellipsis_thread.join() - - ai_response = api_call_queue.get() - messages.append({"role": "assistant", "content": ai_response}) - styler.prompt("assistant", f"{ai_response}\n") - - # TODO: Add some form of token check, as to not overflow - - -if __name__ == "__main__": - main() diff --git a/help.py b/help.py deleted file mode 100644 index 9170241..0000000 --- a/help.py +++ /dev/null @@ -1,178 +0,0 @@ -import json -import re -import subprocess -import sys -from pathlib import Path - -import tiktoken - -# TODO: I don't love the way this file is structured from an extensibility perspective. -# Find a way to maybe like the 'options' dict with the command list? - -def start_chat(model: str, styler): - sys.stdout.write('\033[2J\033[H') - styler.prompt("none", "") - styler.prompt("system", "Welcome to cli-gpt. You may type your questions, or seek additional functionality via '/help'.") - styler.prompt("system", f"Currently using model '{model}'.") - return [{"role": "system", "content": "You are a helpful assistant."}] - -def get_token_count(messages: list, model: str): - total_tokens: int = 0 - encoding = tiktoken.encoding_for_model(model) - for message in messages: - if message.get("role") == "system": - continue - text = message.get("content", "") - message_tokens = len(encoding.encode(text)) - total_tokens += message_tokens - return total_tokens - -class HelpCommands: - - options: dict = { - "/exit": "Closes the chat.", - "/context": "Passthrough a URL to curl the context of into the chat history.", #TODO - "/help": "Display this list of available commands.", - "/load": "Load in a previous chat's JSON file.", - "/save": "Saves messages to specified JSON file and closes chat.", - "/clear": "Clears all messages and tokens from the chatlog, restarting the chat.", - "/model": "Change the model being used.", - "/info": "Print model information and cli-gpt version.", - "/write": "Write out any code from the previous message to a specified file.", - "/copy": "Copy code snippets from the previous message into the copy buffer.", - } - - text_models = [ - "gpt-3.5-turbo", - "gpt-4", - "gpt-4-32k", - "gpt-4-0613", - "gpt-4-32k-0613", - "gpt-4-32k-0613", - "gpt-4-1106-preview", - "gpt-4-vision-preview", - ] - - image_models = [ - "dall-e-2", - "dall-e-3", - ] - - model_type = None - - def __init__(self, model: str): - if model in self.text_models: - self.model_type = "text" - elif model in self.image_models: - self.model_type = "image" - else: - raise TypeError(f"System: Model '{model}' is not available. Please start again and re-specify model version, or leave blank.") - - - def command(self, user_input: str, messages: list, model: str, styler) -> (int, list, str): - if user_input.lower() in list(self.options.keys()): - user_input_lower = user_input.lower() - - if "/exit" in user_input_lower: - return 1, [None], "" - - if "/context" in user_input_lower: - styler.prompt("system", "Please provide the URL you would like to curl.") - url = input("URL: ") - curl_output = subprocess.check_output(f"curl {url}", shell=True).decode('utf-8').strip() - return [None, f"I would like to provide the following context from a curl command to '{url} to this chat: {curl_output}"] - - if "/help" in user_input_lower: - styler.prompt("system", "Below is a list of available commands.") - for key in list(self.options.keys()): - styler.prompt("none", f" - {key}: {self.options[key]}") - return 2, messages, model - - if "/load" in user_input_lower: - styler.prompt("system", "Please specify the filepath you would like to load in from, or '/cancel'.") - path = input("Path: ") - if path != "/cancel": - with open(Path(path), "r") as file: - messages = json.load(file) - styler.prompt("system", f"Successfully read in from {path}. Continuing chat.") - return 2, messages, model - - if "/save" in user_input_lower: - status, destination = self._save(messages) - if status == 1: - return 2, messages, model - styler.prompt("system", f"Successfully saved to {destination}. Closing chat.") - return 1, [None], "" - - if "/clear" in user_input_lower: - styler.prompt("system", "Clearing messages and restarting log.\n\n") - messages = start_chat(model, styler) - return 2, messages, model - - if "/info" in user_input_lower: - styler.prompt("system", f"This chatlog has used {get_token_count(messages, model)} token for model version '{model}'.") - styler.prompt("system", "Currently using cli-gpt version 0.0.1.") - return 2, messages, model - - if "/model" in user_input_lower: - styler.prompt("system", "Below is a list of available models. View up-to-date model information in the OpenAI API documentation.") - styler.prompt("none", "\n - Text Models") - for list_model in self.text_models: - styler.prompt("none", f" - {list_model}") - styler.prompt("none", "\n - Image Models") - for list_model in self.image_models: - styler.prompt("none", f" - {list_model}") - styler.prompt("system", "Change model version below, use '/list' to reprint available models, or '/cancel' to return to chat.") - new_model = input("\nModel: ") - while new_model not in self.text_models and new_model not in self.image_models: - if new_model == "/list": - styler.prompt("system", "Below is a list of available models. View up-to-date model information in the OpenAI API documentation.") - styler.prompt("none", "\n - Text Models") - for list_model in self.text_models: - styler.prompt("none", f" - {list_model}") - styler.prompt("none", "\n - Image Models") - for list_model in self.image_models: - styler.prompt("none", f" - {list_model}") - elif new_model == "/cancel": - return 2, messages, model - else: - print(f"System: '{new_model}' is not an accepted model. Use 'list' to output available models.") - new_model = input("\nModel: ") - - # "image" models and "text" models behave different, handle switching - if (self.model_type == "text" and new_model in self.image_models) or (self.model_type == "image" and new_model in self.text_models): - styler.prompt("system", "Switching between 'text' and 'image' models requires clearing the current message log. Would you like to save before switching models?") - user_save = input("Save? (y,N): ") - if user_save.lower() == "y": - self._save(messages) - return 2, messages, new_model - - if "/write" in user_input_lower: - pattern = r'```(.*?)```' - code_blocks = re.findall(pattern, messages[-1]['content'], re.DOTALL) - print(f"\nSystem: Found {len(code_blocks)} code examples.") - for block in code_blocks: - block = re.sub(r'^.*\n', '', block) - print(f"\n{block}") - print("\nSystem: Please specify the filepath you would like to load in from, or '/cancel'.") - path = input("Path: ") - if path == "/cancel": - return 2, messages, model - with open(Path(path), "w+") as file: - file.write(block) - print(f"System: Successfully saved code snippets. Continuing chat.") - return 2, messages, model - - - messages.append({"role": "user", "content": user_input}) - return 0, messages, model - - def _save(self, messages): - print("\nSystem: Please specify the filepath you would like to save to as a *.json file, or '/cancel'.") - url = input("Path: ") - if url == "/cancel": - return 1, None - with open(Path(url), "w+") as file: - json.dump(messages, file) - return 0, url - diff --git a/src/chat.py b/src/chat.py new file mode 100644 index 0000000..cfa9819 --- /dev/null +++ b/src/chat.py @@ -0,0 +1,80 @@ +import queue +import sys +import os +import threading +import time + +import curses +from openai import OpenAI + +from help import HelpCommands, start_chat +from style import StyleLog + +# Read in token from "token" file +# TODO: env variable in future? +with open("key", "r") as file: + token = file.readlines() +client = OpenAI(api_key=token[0].strip()) + +def text_call(api_call_queue, messages, model): + response = client.chat.completions.create( + model=model, + messages=messages + ) + api_call_queue.put(response.choices[0].message.content) + +def image_call(api_call_queue, messages, model): + response = client.chat.completions.create( + model=model, + messages=messages + ) + api_call_queue.put(response.choices[0].message.content) + +def main(): + + model = "" + if len(sys.argv) > 1: + model = sys.argv[1] + else: + model = "gpt-3.5-turbo" + + helper = HelpCommands(model) + styler = StyleLog() + messages = start_chat(model, styler) + + while True: + + user_input = styler.prompt("user", "") + + status, messages, model = helper.command(user_input, messages, model, styler) + if status == 1: + break + elif status == 2: + continue + + global api_call_done + api_call_done = threading.Event() + api_call_queue = queue.Queue() + + if model == "dall-e-2" or model == "dall-e-3": + response_thread = threading.Thread(target=image_call, args=(api_call_queue, messages, model,)) + else: + response_thread = threading.Thread(target=text_call, args=(api_call_queue, messages, model,)) + response_thread.start() + + ellipsis_thread = threading.Thread(target=styler.show_ellipsis, args=(api_call_done,)) + ellipsis_thread.start() + + response_thread.join() + api_call_done.set() + ellipsis_thread.join() + + ai_response = api_call_queue.get() + messages.append({"role": "assistant", "content": ai_response}) + styler.prompt("assistant", f"{ai_response}\n") + + # TODO: Add some form of token check, as to not overflow + + +if __name__ == "__main__": + main() diff --git a/src/help.py b/src/help.py new file mode 100644 index 0000000..9170241 --- /dev/null +++ b/src/help.py @@ -0,0 +1,178 @@ +import json +import re +import subprocess +import sys +from pathlib import Path + +import tiktoken + +# TODO: I don't love the way this file is structured from an extensibility perspective. +# Find a way to maybe like the 'options' dict with the command list? + +def start_chat(model: str, styler): + sys.stdout.write('\033[2J\033[H') + styler.prompt("none", "") + styler.prompt("system", "Welcome to cli-gpt. You may type your questions, or seek additional functionality via '/help'.") + styler.prompt("system", f"Currently using model '{model}'.") + return [{"role": "system", "content": "You are a helpful assistant."}] + +def get_token_count(messages: list, model: str): + total_tokens: int = 0 + encoding = tiktoken.encoding_for_model(model) + for message in messages: + if message.get("role") == "system": + continue + text = message.get("content", "") + message_tokens = len(encoding.encode(text)) + total_tokens += message_tokens + return total_tokens + +class HelpCommands: + + options: dict = { + "/exit": "Closes the chat.", + "/context": "Passthrough a URL to curl the context of into the chat history.", #TODO + "/help": "Display this list of available commands.", + "/load": "Load in a previous chat's JSON file.", + "/save": "Saves messages to specified JSON file and closes chat.", + "/clear": "Clears all messages and tokens from the chatlog, restarting the chat.", + "/model": "Change the model being used.", + "/info": "Print model information and cli-gpt version.", + "/write": "Write out any code from the previous message to a specified file.", + "/copy": "Copy code snippets from the previous message into the copy buffer.", + } + + text_models = [ + "gpt-3.5-turbo", + "gpt-4", + "gpt-4-32k", + "gpt-4-0613", + "gpt-4-32k-0613", + "gpt-4-32k-0613", + "gpt-4-1106-preview", + "gpt-4-vision-preview", + ] + + image_models = [ + "dall-e-2", + "dall-e-3", + ] + + model_type = None + + def __init__(self, model: str): + if model in self.text_models: + self.model_type = "text" + elif model in self.image_models: + self.model_type = "image" + else: + raise TypeError(f"System: Model '{model}' is not available. Please start again and re-specify model version, or leave blank.") + + + def command(self, user_input: str, messages: list, model: str, styler) -> (int, list, str): + if user_input.lower() in list(self.options.keys()): + user_input_lower = user_input.lower() + + if "/exit" in user_input_lower: + return 1, [None], "" + + if "/context" in user_input_lower: + styler.prompt("system", "Please provide the URL you would like to curl.") + url = input("URL: ") + curl_output = subprocess.check_output(f"curl {url}", shell=True).decode('utf-8').strip() + return [None, f"I would like to provide the following context from a curl command to '{url} to this chat: {curl_output}"] + + if "/help" in user_input_lower: + styler.prompt("system", "Below is a list of available commands.") + for key in list(self.options.keys()): + styler.prompt("none", f" - {key}: {self.options[key]}") + return 2, messages, model + + if "/load" in user_input_lower: + styler.prompt("system", "Please specify the filepath you would like to load in from, or '/cancel'.") + path = input("Path: ") + if path != "/cancel": + with open(Path(path), "r") as file: + messages = json.load(file) + styler.prompt("system", f"Successfully read in from {path}. Continuing chat.") + return 2, messages, model + + if "/save" in user_input_lower: + status, destination = self._save(messages) + if status == 1: + return 2, messages, model + styler.prompt("system", f"Successfully saved to {destination}. Closing chat.") + return 1, [None], "" + + if "/clear" in user_input_lower: + styler.prompt("system", "Clearing messages and restarting log.\n\n") + messages = start_chat(model, styler) + return 2, messages, model + + if "/info" in user_input_lower: + styler.prompt("system", f"This chatlog has used {get_token_count(messages, model)} token for model version '{model}'.") + styler.prompt("system", "Currently using cli-gpt version 0.0.1.") + return 2, messages, model + + if "/model" in user_input_lower: + styler.prompt("system", "Below is a list of available models. View up-to-date model information in the OpenAI API documentation.") + styler.prompt("none", "\n - Text Models") + for list_model in self.text_models: + styler.prompt("none", f" - {list_model}") + styler.prompt("none", "\n - Image Models") + for list_model in self.image_models: + styler.prompt("none", f" - {list_model}") + styler.prompt("system", "Change model version below, use '/list' to reprint available models, or '/cancel' to return to chat.") + new_model = input("\nModel: ") + while new_model not in self.text_models and new_model not in self.image_models: + if new_model == "/list": + styler.prompt("system", "Below is a list of available models. View up-to-date model information in the OpenAI API documentation.") + styler.prompt("none", "\n - Text Models") + for list_model in self.text_models: + styler.prompt("none", f" - {list_model}") + styler.prompt("none", "\n - Image Models") + for list_model in self.image_models: + styler.prompt("none", f" - {list_model}") + elif new_model == "/cancel": + return 2, messages, model + else: + print(f"System: '{new_model}' is not an accepted model. Use 'list' to output available models.") + new_model = input("\nModel: ") + + # "image" models and "text" models behave different, handle switching + if (self.model_type == "text" and new_model in self.image_models) or (self.model_type == "image" and new_model in self.text_models): + styler.prompt("system", "Switching between 'text' and 'image' models requires clearing the current message log. Would you like to save before switching models?") + user_save = input("Save? (y,N): ") + if user_save.lower() == "y": + self._save(messages) + return 2, messages, new_model + + if "/write" in user_input_lower: + pattern = r'```(.*?)```' + code_blocks = re.findall(pattern, messages[-1]['content'], re.DOTALL) + print(f"\nSystem: Found {len(code_blocks)} code examples.") + for block in code_blocks: + block = re.sub(r'^.*\n', '', block) + print(f"\n{block}") + print("\nSystem: Please specify the filepath you would like to load in from, or '/cancel'.") + path = input("Path: ") + if path == "/cancel": + return 2, messages, model + with open(Path(path), "w+") as file: + file.write(block) + print(f"System: Successfully saved code snippets. Continuing chat.") + return 2, messages, model + + + messages.append({"role": "user", "content": user_input}) + return 0, messages, model + + def _save(self, messages): + print("\nSystem: Please specify the filepath you would like to save to as a *.json file, or '/cancel'.") + url = input("Path: ") + if url == "/cancel": + return 1, None + with open(Path(url), "w+") as file: + json.dump(messages, file) + return 0, url + diff --git a/src/style.py b/src/style.py new file mode 100644 index 0000000..ad616d8 --- /dev/null +++ b/src/style.py @@ -0,0 +1,57 @@ +import sys +import time + +from prompt_toolkit import PromptSession, print_formatted_text, prompt +from prompt_toolkit.formatted_text import HTML +from prompt_toolkit.lexers import PygmentsLexer +from prompt_toolkit.styles import Style +from pygments.lexers import PythonLexer +from pygments.styles.native import NativeStyle +from prompt_toolkit.auto_suggest import AutoSuggestFromHistory +from prompt_toolkit.history import FileHistory +from prompt_toolkit.completion import WordCompleter + +class StyleLog: + + styler = None + + style = Style.from_dict({ + 'input': 'bg:#000000 #00ff00', + 'assistant': 'bg:#000000 #7777ff', + 'system': 'bg:#000000 #ff00ff', + }) + + def __init__(self): + self.styler = PromptSession(lexer=PygmentsLexer(PythonLexer), auto_suggest=AutoSuggestFromHistory(), history=FileHistory('history.txt')) + + def prompt(self, role: str, message: str): + if role == 'assistant': + print_formatted_text(HTML(f"Assistant: %s") % (message, ), style = self.style) + elif role == 'user': + user_input = prompt( + [ + ('class:input', "\nInput: "), + ('', '') + ], + style = self.style + ) + return user_input + elif role == 'system': + print_formatted_text(HTML(f'System: {message}'), style = self.style) + elif role == 'none': + print_formatted_text(HTML(f'{message}'), style = self.style) + return + + def show_ellipsis(self, api_call_done): + loop = True + while loop: + for i in range(0, 4): + if api_call_done.is_set(): + loop = False + sys.stdout.write('\r' + ' ' * 4 + '\r') + break + time.sleep(1) + sys.stdout.write('.') + sys.stdout.flush() + sys.stdout.write('\r' + ' ' * 4 + '\r') + sys.stdout.flush() diff --git a/style.py b/style.py deleted file mode 100644 index ad616d8..0000000 --- a/style.py +++ /dev/null @@ -1,57 +0,0 @@ -import sys -import time - -from prompt_toolkit import PromptSession, print_formatted_text, prompt -from prompt_toolkit.formatted_text import HTML -from prompt_toolkit.lexers import PygmentsLexer -from prompt_toolkit.styles import Style -from pygments.lexers import PythonLexer -from pygments.styles.native import NativeStyle -from prompt_toolkit.auto_suggest import AutoSuggestFromHistory -from prompt_toolkit.history import FileHistory -from prompt_toolkit.completion import WordCompleter - -class StyleLog: - - styler = None - - style = Style.from_dict({ - 'input': 'bg:#000000 #00ff00', - 'assistant': 'bg:#000000 #7777ff', - 'system': 'bg:#000000 #ff00ff', - }) - - def __init__(self): - self.styler = PromptSession(lexer=PygmentsLexer(PythonLexer), auto_suggest=AutoSuggestFromHistory(), history=FileHistory('history.txt')) - - def prompt(self, role: str, message: str): - if role == 'assistant': - print_formatted_text(HTML(f"Assistant: %s") % (message, ), style = self.style) - elif role == 'user': - user_input = prompt( - [ - ('class:input', "\nInput: "), - ('', '') - ], - style = self.style - ) - return user_input - elif role == 'system': - print_formatted_text(HTML(f'System: {message}'), style = self.style) - elif role == 'none': - print_formatted_text(HTML(f'{message}'), style = self.style) - return - - def show_ellipsis(self, api_call_done): - loop = True - while loop: - for i in range(0, 4): - if api_call_done.is_set(): - loop = False - sys.stdout.write('\r' + ' ' * 4 + '\r') - break - time.sleep(1) - sys.stdout.write('.') - sys.stdout.flush() - sys.stdout.write('\r' + ' ' * 4 + '\r') - sys.stdout.flush() -- cgit v1.2.3-18-g5258