面试题答案
一键面试以下是使用Python实现的示例代码:
import asyncio
import concurrent.futures
async def async_operation(char):
# 模拟可能失败的API调用
if char == 'x': # 这里随意设置一个失败条件
raise Exception('API call failed for character: {}'.format(char))
await asyncio.sleep(0.1) # 模拟异步操作
return char.upper()
async def process_string(string):
tasks = []
with concurrent.futures.ThreadPoolExecutor() as executor:
loop = asyncio.get_running_loop()
for char in list(string):
task = loop.run_in_executor(executor, async_operation, char)
tasks.append(task)
try:
results = await asyncio.gather(*tasks)
return results
except Exception as e:
# 取消所有未完成的任务
for task in tasks:
if not task.done():
task.cancel()
raise e
if __name__ == "__main__":
string = "abcxdef"
try:
result = asyncio.run(process_string(string))
print(result)
except Exception as e:
print(str(e))
代码解释:
async_operation
函数:模拟对单个字符的异步操作,这里随意设置了一个失败条件,如果字符为'x'
则抛出异常。process_string
函数:- 使用
concurrent.futures.ThreadPoolExecutor
创建线程池执行异步操作。 - 将每个字符的异步操作封装为任务并添加到
tasks
列表中。 - 使用
asyncio.gather
并发执行所有任务。 - 如果任何一个任务抛出异常,捕获异常并取消所有未完成的任务,然后重新抛出异常。
- 使用
- 主程序:
- 定义一个测试字符串
"abcxdef"
。 - 运行
process_string
函数,并捕获可能的异常,打印结果或错误信息。
- 定义一个测试字符串
在实际应用中,你可能需要根据具体的API调用和资源清理需求,调整async_operation
函数和错误处理部分的代码。