mirror of
https://github.com/OpenBMB/ChatDev.git
synced 2026-04-25 19:28:09 +00:00
645 lines
26 KiB
Python
645 lines
26 KiB
Python
import os
|
||
import glob
|
||
import time
|
||
import random
|
||
from camel.agents.chat_agent import ChatAgent
|
||
from camel.typing import ModelType, RoleType
|
||
from camel.messages import ChatMessage, SystemMessage
|
||
import tiktoken
|
||
from chatdev.codes import Codes
|
||
from chatdev.competition import competition_filter
|
||
from camel.typing import ModelType
|
||
import yaml
|
||
|
||
|
||
class Pool:
|
||
def __init__(self, client_number: int, unit_num: int, directory: str, model_type) -> None:
|
||
self.client_number = 1
|
||
self.waiting_queue = {}
|
||
self.improved_queue = {}
|
||
self.response = None
|
||
self.response_text = ''
|
||
self.prompt = ''
|
||
self.time = 0
|
||
self.unit_num = unit_num
|
||
self.max_unit_num = unit_num * 2
|
||
self.group = {}
|
||
self.group_num = 0
|
||
self.index = 0
|
||
self.cycle = 0
|
||
self.round = 0
|
||
self.task_prompt = ''
|
||
self.directory = directory
|
||
self.dir_queue = {}
|
||
self.content_team_dict = {}
|
||
self.model_type = model_type
|
||
|
||
def state_pool_add(self, phase_name, phase_prompt, wait_time, task_prompt, codes, store_dir, temperature):
|
||
"""
|
||
Add codes to the state pool into 'self.waiting_queue' for a wait phase.
|
||
|
||
This method adds codes generated by teams to the 'self.waiting_queue', and when all teams have contributed or
|
||
them specified wait time has passed, it triggers the state pool pop operation to process the codes and
|
||
generate improvements.
|
||
|
||
Parameters:
|
||
- phase_name (str): Name of the current wait phase.
|
||
- phase_prompt (str): Prompt for the current wait phase.
|
||
- wait_time (int): The maximum time to wait for other teams to contribute codes.
|
||
- task_prompt (str): Prompt for the task.
|
||
- codes (Codes): An instance of the Codes class containing the initial codes.
|
||
|
||
Returns: - Codes: An instance of the Codes class containing the generated codes after state pool pop in a
|
||
dict parameter 'codebooks'.
|
||
|
||
Raises:
|
||
- Various Exceptions: This function may rais e exceptions during the execution if certain conditions are not met.
|
||
"""
|
||
self.time = wait_time
|
||
self.task_prompt = task_prompt
|
||
# parts = phase_name.split('_', 1)
|
||
file_path = phase_name
|
||
files = []
|
||
for root, dirs, Files in os.walk(file_path):
|
||
for file in Files:
|
||
if file.startswith("solution_"):
|
||
files.append(os.path.join(root, file))
|
||
empty_file_count = 0
|
||
rate_path = './Rate'
|
||
|
||
# Iterate over files in the state pool directory
|
||
for file in files:
|
||
team_name = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
||
time.sleep(1)
|
||
|
||
# Read content from the file
|
||
with open(file, "r") as pool_file:
|
||
content = pool_file.read()
|
||
codes = Codes(content)
|
||
if codes.codebooks != {}:
|
||
for key in codes.codebooks.keys():
|
||
content += '**' + str(key) + '**' + '\n\n' + codes.codebooks[key] + '\n\n'
|
||
|
||
# Check if the file is empty
|
||
if content == 'pass':
|
||
empty_file_count += 1
|
||
continue
|
||
|
||
# Add code to the waiting queue
|
||
self.waiting_queue[team_name] = content
|
||
|
||
|
||
# self.client_number = self.client_number - empty_file_count
|
||
|
||
# Check if all teams have contributed or the specified wait time has passed
|
||
if len(self.waiting_queue) == self.client_number - empty_file_count:
|
||
self.client_number -= empty_file_count
|
||
# Trigger state pool pop operation
|
||
new_codes = self.state_pool_pop(phase_name, phase_prompt, store_dir, temperature)
|
||
|
||
# Check if new codes were generated
|
||
if new_codes.codebooks and len(new_codes.codebooks.keys()) != 0:
|
||
return new_codes
|
||
|
||
# If not all teams have contributed, reset waiting_queue and return None
|
||
self.waiting_queue = {}
|
||
return None
|
||
|
||
def state_pool_pop(self, phase_name, phase_prompt, store_dir, temperature):
|
||
"""
|
||
Execute the state pool pop operation.
|
||
|
||
This method simulates the process of teams collaborating, competing, and improving code during a wait phase
|
||
of a programming competition. It orchestrates the interaction between teams, the language model API, and code
|
||
improvement strategies.
|
||
|
||
Parameters:
|
||
- phase_name (str): Name of the current wait phase.
|
||
- phase_prompt (str): Prompt for the current wait phase.
|
||
|
||
Returns:
|
||
- Codes: An instance of the Codes class containing the generated codes in a dict parameter 'codebooks'.
|
||
|
||
Raises:
|
||
- Various Exceptions: This function may raise exceptions during the execution if certain conditions are not met.
|
||
"""
|
||
# Whether to turn on select module
|
||
if len(self.waiting_queue) > 4:
|
||
self.prune_test()
|
||
select_flag = 1
|
||
if self.unit_num >= 4:
|
||
self.unit_num = int(self.unit_num / 2)
|
||
self.select_strategy(select_flag)
|
||
new_codes = Codes('')
|
||
content = ''
|
||
|
||
improved_index = 0
|
||
api_call = 0
|
||
|
||
# Main Loop
|
||
while True:
|
||
|
||
# Handle the case where there is only one code
|
||
if not self.group:
|
||
content += ('\nI am the only team that needs to do the wait phase after CodeComplete, jumping out of '
|
||
'cc.\n')
|
||
break
|
||
|
||
content += f'\n\nRound {self.round} begin\n\n'
|
||
self.round += 1
|
||
|
||
# self.competition_filter()
|
||
|
||
# Last round processing
|
||
if len(self.waiting_queue) <= self.unit_num:
|
||
content, new_codes = self.final_process(phase_name, phase_prompt, api_call, content, temperature)
|
||
if not new_codes.codebooks:
|
||
return new_codes
|
||
break
|
||
|
||
self.waiting_queue = {}
|
||
|
||
# Iterate over teams and perform competitive cooperation
|
||
for team_key, code_list in self.group.items():
|
||
new_content = self.competitive_cooperation(team_key, code_list, phase_prompt, phase_name, temperature)
|
||
api_call += 1
|
||
# print(f"Call API for {api_call} times.\n")
|
||
|
||
# Process the response and update code and content
|
||
try:
|
||
new_codes = Codes(self.response_text)
|
||
except Exception as e:
|
||
print(f"Error occurred while processing response: {e}")
|
||
new_codes = Codes('')
|
||
content += new_content
|
||
self.cycle += 1
|
||
new_codes, content, api_call = self.error_check(new_codes, content, api_call, team_key, code_list,
|
||
phase_prompt, phase_name, new_content, temperature)
|
||
|
||
if not new_codes.codebooks:
|
||
return new_codes
|
||
|
||
self.response_text = ''
|
||
for file_name, program in new_codes.codebooks.items():
|
||
self.response_text += file_name + '\n' + program + '\n\n'
|
||
self.improved_queue[team_key] = self.response_text
|
||
improved_index += 1
|
||
del new_codes
|
||
|
||
# Update waiting_queue with improved codes
|
||
self.waiting_queue.update(self.improved_queue)
|
||
if len(self.waiting_queue) >= 4:
|
||
self.prune_test()
|
||
|
||
# Check if there are more codes to be processed
|
||
if len(self.improved_queue) > self.unit_num:
|
||
self.improved_queue = {}
|
||
self.group = {}
|
||
self.select_strategy(select_flag) # Iterate, treat improved code as unimproved code
|
||
|
||
# Save improved code to a file
|
||
|
||
team_name = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
||
|
||
parts = phase_name.split('_', 1)
|
||
improved_path = os.path.join('tmp','improved_codes',parts[0])
|
||
if not os.path.exists(improved_path):
|
||
try:
|
||
os.makedirs(improved_path)
|
||
except OSError as e:
|
||
print(f"failed to create '{improved_path}':{e}")
|
||
else:
|
||
pass
|
||
|
||
prefix = os.path.basename(improved_path)
|
||
# improved_file = f'{prefix}_{team_name}.txt'
|
||
improved_file = f'{team_name}.txt'
|
||
final_result = self.response_text
|
||
with open(os.path.join(improved_path, improved_file), "w") as final_result_file:
|
||
final_result_file.write(final_result)
|
||
with open(store_dir, "w") as final_result_File:
|
||
final_result_File.write(final_result)
|
||
|
||
# Monitor the completion of improvement by teams
|
||
start_time = time.time()
|
||
max_duration = self.time
|
||
time_content = ''
|
||
while True:
|
||
all_files = os.listdir(improved_path)
|
||
filtered_files = [file for file in all_files if file.endswith('.txt')]
|
||
|
||
# Check if all teams have completed improvement
|
||
# print('len(filtered_files = {})'.format(len(filtered_files)))
|
||
# print('client_number = {}'.format(self.client_number))
|
||
if len(filtered_files) == self.client_number:
|
||
content += time_content
|
||
file_path = phase_name
|
||
|
||
# Delete temporary files
|
||
if os.path.exists(file_path):
|
||
files_to_delete = glob.glob(os.path.join(file_path, '*'))
|
||
for del_file in files_to_delete:
|
||
try:
|
||
if os.path.isfile(del_file):
|
||
os.remove(del_file)
|
||
except Exception as file_delete_error:
|
||
print(f'Error occurred while deleting file {file_path}: {file_delete_error}')
|
||
else:
|
||
print(f'{file_path} does not exist.')
|
||
|
||
self.index = 0
|
||
break
|
||
|
||
current_time = time.time()
|
||
elapsed_time = current_time - start_time
|
||
# print(f"I am waiting at {str(phase_name)} for {elapsed_time:.2f} seconds, continue.", end='\r')
|
||
time_content = f"Wait at {str(phase_name)} for {elapsed_time:.2f} seconds, continue.\n"
|
||
|
||
# Check if the waiting time exceeds the maximum duration
|
||
if elapsed_time >= max_duration:
|
||
# print("{} phase has waited more than a minute, continue.".format('Improving'))
|
||
break
|
||
time.sleep(1)
|
||
return new_codes
|
||
|
||
|
||
|
||
def prune_test(self):
|
||
rate_dict = {}
|
||
rate_path = './Rate'
|
||
random_sleep_time = random.randint(1, 8)
|
||
time.sleep(random_sleep_time)
|
||
items_list = list(self.waiting_queue.items())
|
||
|
||
random.shuffle(items_list)
|
||
|
||
shuffled_waiting_queue = dict(items_list)
|
||
for team_name, program in shuffled_waiting_queue.items():
|
||
file = team_name + '.txt'
|
||
if os.path.exists(os.path.join(rate_path, file)):
|
||
with open(os.path.join(rate_path, file), 'r') as f:
|
||
rate_number = float(f.read())
|
||
else:
|
||
# if not "\"" in program and not "```" in program and not "'''" in program: #################################
|
||
# program = program.replace("main.py", "")
|
||
# program= "main.py\n```\n'''\n" + program + "\n'''\n```"
|
||
codes = Codes(program)
|
||
folder_path = './RateTest/python_scripts_folder{}'.format(time.time())
|
||
if not os.path.exists(folder_path):
|
||
os.makedirs(folder_path)
|
||
for filename, code in codes.codebooks.items():
|
||
file_path = os.path.join(folder_path, filename)
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
f.write(code)
|
||
del codes
|
||
|
||
rate_number = competition_filter(folder_path)
|
||
with open(os.path.join(rate_path, file), 'w') as f:
|
||
f.write(str(rate_number))
|
||
random_sleep_time = random.randint(1, 8)
|
||
time.sleep(random_sleep_time)
|
||
|
||
rate_dict[team_name] = rate_number
|
||
|
||
sorted_dict_descending = sorted(rate_dict.items(), key=lambda item: item[1], reverse=True)
|
||
|
||
rate_dict = dict(sorted_dict_descending)
|
||
|
||
half_length = len(rate_dict) // 2
|
||
|
||
winner_team = [item[0] for item in sorted_dict_descending[:half_length]]
|
||
for key in self.waiting_queue.copy():
|
||
if key not in winner_team:
|
||
del self.waiting_queue[key]
|
||
|
||
|
||
def error_check(self, new_codes, content, api_call, team_key, code_list, phase_prompt, phase_name, new_content , temperature):
|
||
"""
|
||
Check and handle errors in the generated codes.
|
||
|
||
This method checks for errors in the generated codes and attempts to redo the code generation if the format is
|
||
incorrect. It retries up to three times before giving up.
|
||
|
||
Parameters:
|
||
- new_codes (Codes): An instance of the Codes class containing the generated codes.
|
||
- content (str): The accumulated content from the code generation process.
|
||
- api_call (int): The number of API calls made during the process.
|
||
- team_key (str): The key representing the team in the collaboration.
|
||
- code_list (list): The list of codes associated with the team.
|
||
- phase_prompt (str): Prompt for the current phase.
|
||
- phase_name (str): Name of the current phase.
|
||
- new_content (str): The latest content generated during the collaboration.
|
||
|
||
Returns:
|
||
- Codes: An instance of the Codes class containing the corrected or new generated codes.
|
||
- str: The updated content after error handling.
|
||
- int: The updated number of API calls.
|
||
|
||
Raises:
|
||
- None: This function does not raise any exceptions.
|
||
"""
|
||
index = 0
|
||
|
||
# Retry code generation up to three times if the format is incorrect
|
||
while new_codes.codebooks == {}:
|
||
index += 1
|
||
|
||
# Check if retry limit is reached
|
||
if index > 3:
|
||
new_codes = Codes('')
|
||
return new_codes, content, api_call
|
||
|
||
del new_codes
|
||
# print(new_content)
|
||
# print('Generated a wrong format answer, redo.')
|
||
content += '\nGenerated a wrong format answer, redo.\n'
|
||
|
||
# Redo code generation
|
||
new_content = self.competitive_cooperation(team_key, code_list, phase_prompt, phase_name, temperature)
|
||
api_call += 1
|
||
# print(f"Call API for {api_call} times.\n")
|
||
try:
|
||
new_codes = Codes(self.response_text)
|
||
except Exception as e:
|
||
print(f"Error occurred while processing response: {e}")
|
||
new_codes = Codes('')
|
||
content += new_content
|
||
self.cycle += 1
|
||
|
||
return new_codes, content, api_call
|
||
|
||
def final_process(self, phase_name, phase_prompt, api_call, content, temperature):
|
||
"""
|
||
Perform the final processing for wait phase with accumulated codes from 'self.waiting_queue'.
|
||
|
||
This method combines the codes from all remaining teams in the waiting queue, performs competitive
|
||
cooperation, and generates the final output for the wait phase.
|
||
|
||
Parameters:
|
||
- phase_name (str): Name of the current phase.
|
||
- phase_prompt (str): Prompt for the current phase.
|
||
- api_call (int): The number of API calls have made during the process.
|
||
- content (str): The accumulated content from the code generation process.
|
||
|
||
Returns:
|
||
- str: The updated content after final processing.
|
||
- Codes: An instance of the Codes class containing the final generated codes.
|
||
|
||
Raises:
|
||
- None: This function does not raise any exceptions.
|
||
"""
|
||
code_list = []
|
||
team_key = ''
|
||
|
||
# Extract codes and team keys from the waiting queue
|
||
team_key = ''.join(self.waiting_queue.keys())
|
||
code_list = list(self.waiting_queue.values())
|
||
|
||
team_key = f'({team_key})'
|
||
|
||
# Perform competitive cooperation for the final round
|
||
new_content = self.competitive_cooperation(team_key, code_list, phase_prompt, phase_name, temperature)
|
||
api_call += 1
|
||
# print(f"Call API for {api_call} times.\n")
|
||
|
||
# Process the response and handle errors
|
||
try:
|
||
new_codes = Codes(self.response_text)
|
||
except Exception as e:
|
||
# print(f"Error occurred while processing response: {e}")
|
||
new_codes = Codes('')
|
||
new_codes, content, api_call = self.error_check(new_codes, content, api_call, team_key, code_list, phase_prompt,
|
||
phase_name, new_content,temperature)
|
||
|
||
# Update content with the final round results
|
||
content += f"{new_content}\n\n{phase_name} all finished"
|
||
self.response_text = ''
|
||
|
||
for file_name, program in new_codes.codebooks.items():
|
||
self.response_text += f"{file_name}\n{program}\n\n"
|
||
|
||
content += f"\n{phase_name} all finished."
|
||
return content, new_codes
|
||
|
||
def select_strategy_greedy(self):
|
||
"""
|
||
Select coding teams for competitive cooperation using a greedy strategy.
|
||
|
||
This method selects coding teams based on the length of their codes, prioritizing longer codes. It aims to
|
||
maximize the utilization of available tokens for competitive cooperation.
|
||
|
||
Raises:
|
||
- ValueError: Raised if the model is deemed incompetent for the job based on available tokens.
|
||
|
||
Returns:
|
||
- None: This function does not return a value. Instead, it updates the improved_queue and group attributes.
|
||
"""
|
||
word_count = {}
|
||
max_code_len = 0
|
||
content = ''
|
||
encoding = tiktoken.encoding_for_model(self.model_type.value)
|
||
|
||
# Populate remaining_elements with codes from the waiting queue
|
||
remaining_elements = list(self.waiting_queue.values())
|
||
|
||
for code in remaining_elements:
|
||
encoding = tiktoken.encoding_for_model(self.model_type.value)
|
||
num_prompt_tokens = len(encoding.encode(code))
|
||
word_count[code] = num_prompt_tokens
|
||
max_code_len = max(max_code_len, num_prompt_tokens)
|
||
|
||
num_max_token_map = {
|
||
"gpt-3.5-turbo": 4096,
|
||
"gpt-3.5-turbo-16k": 16384,
|
||
"gpt-3.5-turbo-0613": 4096,
|
||
"gpt-3.5-turbo-16k-0613": 16384,
|
||
"gpt-4": 8192,
|
||
"gpt-4-0613": 8192,
|
||
"gpt-4-32k": 32768,
|
||
}
|
||
num_max_token = num_max_token_map[self.model_type.value]
|
||
usable_len = num_max_token - 3 * max_code_len
|
||
|
||
if usable_len < max_code_len * 2:
|
||
raise ValueError("This model is incompetent for this job.")
|
||
|
||
sorted_list = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
|
||
remaining_elements = [item[0] for item in sorted_list]
|
||
|
||
index = 0
|
||
selected = []
|
||
unimproved_code_len = 0
|
||
|
||
if len(remaining_elements) == 1:
|
||
team_key = f'({self.waiting_queue[remaining_elements[0]]})'
|
||
self.improved_queue[team_key] = remaining_elements[0]
|
||
else:
|
||
team_key = '('
|
||
|
||
for code in remaining_elements:
|
||
unimproved_code_len += word_count[code]
|
||
if unimproved_code_len < usable_len and index < self.max_unit_num:
|
||
selected.append(code)
|
||
team_key += self.waiting_queue[code]
|
||
index += 1
|
||
continue
|
||
team_key += ')'
|
||
self.group[team_key] = selected.copy()
|
||
index = 1
|
||
self.group_num += 1
|
||
if code == remaining_elements[-1]:
|
||
team_key = '(' + self.waiting_queue[remaining_elements[0]] + ')'
|
||
self.improved_queue[team_key] = remaining_elements[0]
|
||
break
|
||
team_key = '('
|
||
selected = [code]
|
||
team_key += self.waiting_queue['code']
|
||
unimproved_code_len = word_count[code]
|
||
|
||
team_key = f'({team_key})'
|
||
self.group[team_key] = selected.copy()
|
||
# self.group[self.group_num] = selected.copy()
|
||
|
||
def select_strategy(self, select_flag):
|
||
"""
|
||
Select strategy for team collaboration based on none greedy aggregation, focusing on team and unit numbers.
|
||
|
||
Raises:
|
||
- ValueError: If the unit number is less than or equal to 1.
|
||
|
||
Returns:
|
||
- None: The function modifies class attributes 'self.group' to store the selected strategy details.
|
||
"""
|
||
if self.unit_num <= 1:
|
||
raise ValueError("Invalid Unit number. Unit must be a positive integer.")
|
||
|
||
# Create a list of waiting codes from the waiting_queue
|
||
remaining_elements = list(self.waiting_queue.values())
|
||
if select_flag:
|
||
index = -1
|
||
selected_keys = {}
|
||
while remaining_elements:
|
||
index += 1
|
||
if len(remaining_elements) == 1:
|
||
# Handle the case with only one code remaining
|
||
for wait_key, wait_code in self.waiting_queue.items():
|
||
if wait_code == remaining_elements[0]:
|
||
team_key = wait_key
|
||
team_key = f'({team_key})'
|
||
self.improved_queue[team_key] = remaining_elements[0]
|
||
break
|
||
|
||
if len(remaining_elements) <= self.unit_num:
|
||
# Handle the case when remaining elements are less than or equal to unit_num
|
||
selected_tem = remaining_elements.copy()
|
||
team_key = ''
|
||
for key, value in self.waiting_queue.items():
|
||
if (value in remaining_elements) and (value in selected_tem):
|
||
team_key += str(key)
|
||
selected_tem.remove(value)
|
||
team_key = f'({team_key})'
|
||
self.group[team_key] = remaining_elements.copy()
|
||
self.group_num += 1
|
||
break
|
||
|
||
# Randomly select elements for the team
|
||
selected = random.sample(remaining_elements, self.unit_num)
|
||
selected_tem = selected.copy()
|
||
team_key = ''
|
||
for key, value in self.waiting_queue.items():
|
||
if (value in selected) and (value in selected_tem):
|
||
team_key += str(key)
|
||
selected_tem.remove(value)
|
||
|
||
# Store selected team and code details
|
||
selected_keys[index] = team_key
|
||
code_list = selected.copy()
|
||
team_key = f'({team_key})'
|
||
self.group[team_key] = selected.copy()
|
||
self.group_num += 1
|
||
|
||
# Remove selected elements from the remaining elements
|
||
tem_list = remaining_elements.copy()
|
||
for elem in remaining_elements:
|
||
if elem in selected and selected:
|
||
tem_list.remove(elem)
|
||
selected.remove(elem)
|
||
remaining_elements = tem_list.copy()
|
||
else:
|
||
team_key = '('
|
||
for i in range(len(remaining_elements)):
|
||
team_key += chr(ord('A') + i)
|
||
team_key += ')'
|
||
self.group = {team_key: remaining_elements}
|
||
|
||
def competitive_cooperation(self, team_key, code_list, phase_prompt, phase_name, temperature):
|
||
"""
|
||
Generate content for competitive cooperation phase.
|
||
|
||
Parameters:
|
||
- team_key (str): The key representing the teams involved.
|
||
- code_list (list): List of codes provided for competition and cooperation.
|
||
- phase_prompt (str): Prompt for the current phase.
|
||
- phase_name (str): Name of the current phase.
|
||
|
||
Returns:
|
||
- str: Generated content for the phase.
|
||
"""
|
||
new_content = f'Teams {team_key} are having competition and cooperation.\n\n'
|
||
self.prompt_assemble(code_list, phase_prompt)
|
||
|
||
new_content += f"Into LLM_api at {phase_name}.\n\n"
|
||
|
||
self.llm_api(temperature)
|
||
|
||
new_content += f"Out of LLM_api at {phase_name}.\n\n"
|
||
|
||
# new_content += (
|
||
# f"prompt_tokens: {self.response['usage']['prompt_tokens']}\n"
|
||
# f"completion_tokens: {self.response['usage']['completion_tokens']}\n"
|
||
# f"total_tokens: {self.response['usage']['total_tokens']}\n\n"
|
||
# f"Prompt:\n{self.prompt}\n\n"
|
||
# )
|
||
|
||
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
time_content = f"[{time_now} INFO] Programmer: **Programmer<->Programmer on: {phase_name}, turn {self.cycle}**"
|
||
new_content += f"\n\n{time_content}\n\nAnswer:\n{self.response_text}\n\ncycle{self.cycle} end.\n\n"
|
||
|
||
return new_content
|
||
|
||
def llm_api(self, temperature):
|
||
agent = ChatAgent(
|
||
system_message=SystemMessage(content="", role_name="assistant", role_type=RoleType.ASSISTANT),
|
||
model=self.model_type,
|
||
temperature=temperature,
|
||
)
|
||
message = ChatMessage(content=self.prompt,
|
||
role_name="User",
|
||
role_type=RoleType.USER,
|
||
meta_dict=dict(),
|
||
role="user")
|
||
self.response = agent.step(message)
|
||
self.response_text = self.response.msgs[0].content.replace("main.py", "\nmain.py").replace("```","\n```").replace("'''\n", "\n'''\n").replace("'''","'''\n").replace("\n'''","\n'''\n")
|
||
|
||
|
||
def prompt_assemble(self, code_list, phase_prompt):
|
||
"""
|
||
Assemble the prompt for a competitive_cooperation.
|
||
|
||
This method constructs the prompt for a collaborative coding phase, incorporating the given code_list and
|
||
phase-specific prompt.
|
||
|
||
Parameters:
|
||
- code_list (list): List of codes submitted by different teams.
|
||
- phase_prompt (str): Prompt for the current phase.
|
||
|
||
Returns:
|
||
- None: This function does not return a value. Instead, it updates the prompt attribute 'self.prompt'.
|
||
"""
|
||
self.prompt = f"{phase_prompt}\n\nHere are {len(code_list)} programs with the same task requirement: {self.task_prompt}"
|
||
|
||
# Assemble prompt for each team's program for cc
|
||
for index, code in enumerate(code_list):
|
||
temp_prompt = f"\n\nTeam {index}'s Program:\n{code}\n"
|
||
self.prompt += temp_prompt
|