Unlocking Performance: Threading vs AsyncIO in Theolex — Choosing the Right Concurrency Model
Introduction
THEOLEX is a data-driven application that uses advanced data science methodologies and natural language understanding technologies. It serves as an essential tool for legal professionals, helping them interpret and predict sanctions enforced by global regulatory authorities. The platform accomplishes this by gathering, processing, and analyzing “decisions” — extensive sets of documents containing crucial legal information.
Previously, our team at NORMA shared how we improved Theolex’s data processing pipeline by switching from Apache Airflow to Google Cloud Workflows. This change addressed challenges related to scalability, cost-efficiency, and maintainability. Building on that foundation, we faced new technical challenges related to performance bottlenecks during machine learning inference.
To further optimize costs while ensuring scalability and maintainability, we adopted a serverless deployment model for all our services. Additionally, we deployed inference endpoints separately, which made it easier to achieve parallelism and allowed better resource management for handling machine learning workloads.
In this article, we discuss how we optimized Theolex’s performance by selecting the right concurrency model — Threading vs AsyncIO — to handle both dependent and independent tasks efficiently. We explore the specific case of classification and extraction tasks, their interdependencies, and how asynchronous operations allowed us to push results to storage without blocking further processing.
Table of Contents
- Understanding the Performance Challenges in Theolex
- Concurrency vs Parallelism in Machine Learning Inference
- Implementing Threading for Dependent Tasks
- Leveraging AsyncIO for Non-Blocking Operations
- Hybrid Approach: Combining Threading with AsyncIO
- Conclusion
1. Understanding the Performance Challenges in Theolex
As Theolex scaled, we encountered significant performance bottlenecks during the machine learning inference phase. Specifically, we dealt with:
- Classification Tasks: Machine learning models that categorize legal documents into predefined classes.
- Extraction Tasks: Models that extract specific information from documents, such as entities or key terms.
Dependencies: The extraction tasks depended on the results of the classification tasks. For instance, knowing the document type (from classification) determined which extraction model to use.
Challenges:
- Sequential Execution Delays: Running classification and then extraction tasks sequentially increased processing time.
- Resource Underutilization: While waiting for classification results, the system wasn’t effectively using available computational resources.
- Blocking I/O Operations: At the end of processing, results were pushed to storage synchronously, which blocked further processing.
Our goal was to optimize this workflow to make it as fast as possible, ensuring that:
- Dependent tasks are managed efficiently.
- Independent tasks run concurrently.
- Pushing results to storage does not block subsequent processing.
2. Concurrency vs Parallelism in Machine Learning Inference
Concurrency and parallelism are often used interchangeably, but they represent different concepts:
- Concurrency: Managing multiple tasks by switching between them, giving the appearance of tasks running simultaneously.
- Parallelism: Executing multiple tasks at the same time using multiple processors or cores.
In our context:
- Concurrency is suitable for I/O-bound tasks, such as reading from or writing to storage.
- Parallelism is beneficial for CPU-bound tasks, like running machine learning inferences.
Understanding these differences is crucial for optimizing performance in machine learning applications like Theolex.
3. Implementing Threading for Dependent Tasks
To handle the dependent tasks efficiently, we used threading. This allowed us to run classification and extraction tasks in parallel where possible.
Why Threading?
- Parallel Execution: Threads can run on multiple CPU cores, enabling true parallelism.
- Shared Memory: Threads share the same memory space, making it easier to share data between tasks.
Implementation: We used the ThreadPoolExecutor from Python’s concurrent.futures module to manage threads.
from concurrent.futures import ThreadPoolExecutor
def classify_document(content):
print("Classifying document…")
# Simulate a CPU-bound classification task
import time
time.sleep(2)
if "agreement" in content.lower():
return "Contract"
else:
return "Other"
def extract_information(content, document_type):
print(f"Extracting information for {document_type}…")
# Simulate a CPU-bound extraction task
time.sleep(2)
if document_type == "Contract":
return {"parties": ["Alice", "Bob"], "date": "2023–10–16"}
else:
return {}
def process_document(content):
with ThreadPoolExecutor() as executor:
# Run classification task
classification_future = executor.submit(classify_document, content)
document_type = classification_future.result()
# Run extraction task based on classification result
extraction_future = executor.submit(extract_information, content, document_type)
extraction_result = extraction_future.result()
return {
"document_type": document_type,
"extraction_result": extraction_result
}
content = "This is a contract agreement between Alice and Bob."
result = process_document(content)
print(f"Final result: {result}")
Limitations:
- Global Interpreter Lock (GIL): In CPython, GIL can limit the effectiveness of threading for CPU-bound tasks.
- Resource Contention: Threads share resources, which can lead to contention and reduce performance.
4. Leveraging AsyncIO for Non-Blocking Operations
At the end of processing, we needed to push results to storage. Initially, this was done synchronously, which blocked further processing. To optimize, we used AsyncIO for these I/O-bound tasks.
Why AsyncIO?
- Non-Blocking I/O: Allows other tasks to run while waiting for I/O operations to complete.
- Efficient Resource Utilization: Frees up the event loop to handle other tasks during I/O waits.
Implementation: We used asyncio to perform storage operations without blocking the main processing flow.
import asyncio
async def push_results_to_storage(data):
print("Pushing results to storage…")
# Simulate an I/O-bound storage operation
await asyncio.sleep(1)
print("Results pushed to storage.")
async def main_processing():
content = "This is a contract agreement between Alice and Bob."
result = process_document(content)
# Start pushing results to storage asynchronously
asyncio.create_task(push_results_to_storage(result))
# Continue with other processing without waiting for storage operation
print("Continuing with other processing…")
asyncio.run(main_processing())
5. Hybrid Approach: Combining Threading with AsyncIO
To achieve maximum performance, we combined threading and AsyncIO. This hybrid approach allowed us to:
- Run CPU-bound tasks (classification and extraction) efficiently.
- Perform I/O-bound tasks (pushing results to storage) asynchronously without blocking.
Full Example Code:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def classify_document(content):
# Simulate CPU-bound classification task
import time
time.sleep(2)
if "agreement" in content.lower():
return "Contract"
else:
return "Other"
def extract_information(content, document_type):
# Simulate CPU-bound extraction task
time.sleep(2)
if document_type == "Contract":
return {"parties": ["Alice", "Bob"], "date": "2023–10–16"}
else:
return {}
def process_document(content):
with ThreadPoolExecutor() as executor:
# Classification task
document_type = executor.submit(classify_document, content).result()
# Extraction task depends on classification result
extraction_result = executor.submit(extract_information, content, document_type).result()
return {
"document_type": document_type,
"extraction_result": extraction_result
}
async def push_results_to_storage(data):
# Simulate non-blocking I/O operation
await asyncio.sleep(1)
print("Results pushed to storage.")
async def main():
content = "This is a contract agreement between Alice and Bob."
# Run CPU-bound processing in a separate thread to avoid blocking the event loop
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, process_document, content)
# Start pushing results to storage asynchronously
asyncio.create_task(push_results_to_storage(result))
# Continue with other processing immediately
print("Continuing with other processing while storage operation is in progress…")
# Simulate other processing
await asyncio.sleep(1)
print("Other processing completed.")
# Wait for storage operation to complete before exiting
await asyncio.sleep(1)
asyncio.run(main())
6. Conclusion
By adopting a hybrid concurrency model that combines threading and AsyncIO, we significantly improved the performance of Theolex. This approach allowed us to:
- Efficiently handle dependent tasks (classification and extraction) using threading.
- Perform non-blocking I/O operations (pushing results to storage) using AsyncIO.
- Ensure that the processing pipeline runs as quickly as possible without unnecessary delays.
Additionally, our decision to deploy all services in serverless mode optimized costs while maintaining scalability. By deploying inference endpoints separately, we were able to further enhance parallelism and improve resource allocation for machine learning tasks.
Key Takeaways:
- Understanding Task Nature: Identifying whether tasks are CPU-bound or I/O-bound is crucial.
- Appropriate Concurrency Model: Selecting the right model (threading, AsyncIO, or both) based on task requirements improves performance.
- Asynchronous Operations: Using AsyncIO for I/O-bound tasks prevents blocking and improves efficiency.
- Threading Limitations: Be aware of the GIL in Python and consider multiprocessing if threading does not yield the desired performance.
- Serverless Deployment: Deploying services in serverless mode helps optimize costs and scalability, while separating inference endpoints enhances parallelism.
Context for Consideration
For applications facing similar challenges — where tasks have dependencies and require both CPU-bound and I/O-bound operations — adopting a hybrid concurrency model can be highly effective. This is particularly relevant for:
- Machine Learning Pipelines: Where inference tasks may depend on each other and need to process large volumes of data efficiently.
- Data Processing Systems: That require rapid handling of dependent tasks and non-blocking I/O operations.
- Real-Time Applications: Where latency and throughput are critical, and blocking operations can degrade performance.
By carefully analyzing the nature of your tasks and selecting the appropriate concurrency strategies, you can optimize performance and resource utilization in your applications.
Final Thoughts
Optimizing performance in complex systems like Theolex requires a deep understanding of concurrency models and their proper application. For junior and mid-level software engineers, mastering these concepts is essential for building efficient, scalable, and responsive applications.
References
- Python Documentation: Threading, AsyncIO
- FastAPI Documentation: Concurrency and Burgers
- Previous Articles: Optimizing Theolex’s Data Pipeline with Google Cloud Workflows by Abdelhalim Hriche.
Appendix: Understanding Concurrency and Parallelism
To further clarify the concepts, consider the analogy provided in the FastAPI documentation:
- Concurrency (Asynchronous Burgers): You place an order at a fast-food restaurant and receive a number. While your food is being prepared, you can engage in other activities (like chatting with a friend). This represents handling multiple tasks by efficiently switching between them.
- Parallelism (Parallel Burgers): Multiple chefs prepare different orders simultaneously. This represents executing multiple tasks at the same time using multiple processors.
In Theolex, we applied concurrency using AsyncIO to handle I/O-bound tasks without blocking the system. We utilized parallelism through threading to run CPU-bound tasks concurrently across multiple cores.