Files Management
The Kodosumi Files Management API provides two main classes for working with files: AsyncFileSystem and SyncFileSystem. Both classes enable you to download files uploaded by the user and to upload files created during flow execution. Files and folders uploaded by the user are located in a root folder /in (input files). Files and folders created during flow execution which will be delivered back to the user are located in a root folder /out (output files).
Overview
AsyncFileSystem vs SyncFileSystem
| Feature | AsyncFileSystem | SyncFileSystem |
|---|---|---|
| Usage | Asynchronous functions (async def) | Synchronous functions |
| Performance | Non-blocking, better I/O performance | Blocking, easier to use |
| Ray Remote Functions | Not compatible | Compatible |
| Context Manager | async with | with |
AsyncFileSystem and SyncFileSystem Classes
Basic Structure
Both classes share the same interface but differ in implementation. As a developer of agentic services you access both classes through the Tracer object:
# agentic service entrypoint
async def run(inputs: dict, tracer: Tracer):
fs = await tracer.fs()
files = fs.ls("in")
# ... additional operations
await fs.close()Access SyncFileSystem if you pass the tracer object to a Ray remote function. This constraint is due to the fact that Ray remote functions do not support async execution.
import ray
# agentic service entrypoint spawning remote functions
async def run(inputs: dict, tracer: Tracer):
fs = await tracer.fs()
files = fs.ls("in")
await fs.close()
futures = [process.remote(f['path'], tracer) for f in files]
@ray.remote
def process(file: str, tracer: Tracer):
fs = tracer.fs_sync()
files = fs.ls("in")
# ... additional operations
fs.close()Working with Context Managers
Both AsyncFileSystem and SyncFileSystem support context managers for automatic resource cleanup:
AsyncFileSystem Context Manager
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
# File system operations here
files = await fs.ls("in")
# Automatic cleanup when exiting the contextSyncFileSystem Context Manager
def process(file: str, tracer: Tracer):
with tracer.fs_sync() as fs:
# File system operations here
files = fs.ls("in")
# Automatic cleanup when exiting the contextListing Files and Folders
Use the ls() method to list files and subfolders in the input or output directories:
AsyncFileSystem
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
# List all files in the input directory
input_files = await fs.ls("in")
# List files in a specific subfolder
subfolder_files = await fs.ls("in/subfolder")
# List files in the output directory
output_files = await fs.ls("out")
for file_info in input_files:
print(f"File: {file_info['path']}, Size: {file_info['size']}")SyncFileSystem
def process(file: str, tracer: Tracer):
with tracer.fs_sync() as fs:
# List all files in the input directory
input_files = fs.ls("in")
# List files in a specific subfolder
subfolder_files = fs.ls("in/subfolder")
# List files in the output directory
output_files = fs.ls("out")
for file_info in input_files:
print(f"File: {file_info['path']}, Size: {file_info['size']}")Downloading Files and Folders
The download() method allows you to download files uploaded by the user to a local temporary directory:
AsyncFileSystem
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
# Download a single file
async for local_path in fs.download("in/document.pdf"):
print(f"Downloaded to: {local_path}")
# Download an entire folder
async for local_path in fs.download("in/documents"):
print(f"Downloaded: {local_path}")
# Download a specific subfolder
async for local_path in fs.download("in/documents/reports"):
print(f"Downloaded: {local_path}")SyncFileSystem
def process(file: str, tracer: Tracer):
with tracer.fs_sync() as fs:
# Download a single file
for local_path in fs.download("in/document.pdf"):
print(f"Downloaded to: {local_path}")
# Download an entire folder
for local_path in fs.download("in/documents"):
print(f"Downloaded: {local_path}")
# Download a specific subfolder
for local_path in fs.download("in/documents/reports"):
print(f"Downloaded: {local_path}")Uploading Files and Folders
The upload() method allows you to upload files created during service execution to the output directory:
AsyncFileSystem
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
# Create some output files
with open("/tmp/result.txt", "w") as f:
f.write("Processing completed")
# Upload a single file
batch_id = await fs.upload("/tmp/result.txt")
print(f"Uploaded with batch ID: {batch_id}")
# Upload an entire folder
batch_id = await fs.upload("/tmp/output_folder")
print(f"Uploaded folder with batch ID: {batch_id}")SyncFileSystem
def process(file: str, tracer: Tracer):
with tracer.fs_sync() as fs:
# Create some output files
with open("/tmp/result.txt", "w") as f:
f.write("Processing completed")
# Upload a single file
batch_id = fs.upload("/tmp/result.txt")
print(f"Uploaded with batch ID: {batch_id}")
# Upload an entire folder
batch_id = fs.upload("/tmp/output_folder")
print(f"Uploaded folder with batch ID: {batch_id}")Opening and Reading Files
Use the open() method to create file streams for reading files. The streams support both read_all() and read() methods:
AsyncFileSystem
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
# Open and read entire file content
async with fs.open("in/document.txt") as file_stream:
content = await file_stream.read_all()
print(f"File content: {content.decode('utf-8')}")
# Open and read file in chunks
async with fs.open("in/large_file.bin") as file_stream:
async for chunk in file_stream.read():
# Process each chunk
process_chunk(chunk)SyncFileSystem
def process(file: str, tracer: Tracer):
with tracer.fs_sync() as fs:
# Open and read entire file content
with fs.open("in/document.txt") as file_stream:
content = file_stream.read_all()
print(f"File content: {content.decode('utf-8')}")
# Open and read file in chunks
with fs.open("in/large_file.bin") as file_stream:
for chunk in file_stream.read():
# Process each chunk
process_chunk(chunk)Removing Files and Folders
Use the remove() method to delete files and folders:
AsyncFileSystem
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
# Remove a single file
success = await fs.remove("in/temp_file.txt")
if success:
print("File removed successfully")
# Remove a folder (and all its contents)
success = await fs.remove("in/temp_folder")
if success:
print("Folder removed successfully")SyncFileSystem
def process(file: str, tracer: Tracer):
with tracer.fs_sync() as fs:
# Remove a single file
success = fs.remove("in/temp_file.txt")
if success:
print("File removed successfully")
# Remove a folder (and all its contents)
success = fs.remove("in/temp_folder")
if success:
print("Folder removed successfully")Complete Example
Here's a complete example showing how to use the file management API in a typical workflow:
from pathlib import Path
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
# List all input files
input_files = await fs.ls("in")
print(f"Found {len(input_files)} input files")
# Process each input file
for file_info in input_files:
file_path = file_info['path']
# Read the file content
async with fs.open(file_path) as file_stream:
content = await file_stream.read_all()
# Process the content
processed_content = process_content(content)
# Create output file
output_filename = f"processed_{Path(file_info['path']).name}"
with open(f"/tmp/{output_filename}", "wb") as f:
f.write(processed_content)
# Upload the processed file
batch_id = await fs.upload(f"/tmp/{output_filename}")
print(f"Uploaded {output_filename} with batch ID: {batch_id}")
# Clean up temporary files
for file_info in input_files:
if Path(file_info['path']).name.startswith('temp_'):
await fs.remove(file_info['path'])Error Handling
The file management API raises appropriate exceptions for common error conditions:
FileNotFoundError: When trying to access a file or folder that doesn't existRuntimeError: When trying to use a closed stream or re-enter a stream context- HTTP exceptions: For network-related errors during file operations
async def run(inputs: dict, tracer: Tracer):
async with await tracer.fs() as fs:
try:
# Try to access a non-existent file
async with fs.open("in/nonexistent.txt") as file_stream:
content = await file_stream.read_all()
except FileNotFoundError:
print("File not found")
except Exception as e:
print(f"Error accessing file: {e}")Where to get from here?
- Continue with complete upload/download example

