Advanced Usage
This guide covers advanced usage patterns and features of Legion Query Runners beyond the basic configuration and queries.
Schema Manipulation
Legion Query Runners provides comprehensive schema access capabilities:
from legion_query_runner.query_runner import QueryRunner
runner = QueryRunner('sqlite', {"dbpath": "database.sqlite"})
# Get full schema
schema = runner.get_schema()
print(f"Found {len(schema)} tables")
for table in schema:
print(f"Table: {table['name']}")
for column in table['columns']:
if isinstance(column, dict):
print(f" - {column['name']} ({column['type']})")
else:
print(f" - {column}")
Type Handling
The query runners automatically map database-specific types to standardized types:
# The result structure contains type information for each column
results = runner.run_query("SELECT * FROM users LIMIT 1")
for column in results['columns']:
print(f"Column: {column['name']}, Type: {column['type']}")
Query Parameterization
Always use parameterized queries to prevent SQL injection:
# Correct approach - use parameters
query = "SELECT * FROM users WHERE id = %s"
results = runner.run_query(query, None, parameters=[5])
# BAD approach - don't do this!
user_id = 5
query = f"SELECT * FROM users WHERE id = {user_id}"
Note: Different databases have different parameter styles:
- SQLite, PostgreSQL:
%s
placeholders - MySQL:
%s
placeholders - Oracle:
:name
named parameters - SQL Server:
?
placeholders - BigQuery: Named parameters with
@
Query Timeouts and Cancellation
For long-running queries, you can implement timeout handling:
import threading
import time
def execute_with_timeout(runner, query, timeout_seconds=10):
result = [None, None]
exception = [None]
def run_query():
try:
result[0], result[1] = runner.run_query(query, None)
except Exception as e:
exception[0] = e
query_thread = threading.Thread(target=run_query)
query_thread.start()
query_thread.join(timeout_seconds)
if query_thread.is_alive():
# This will initiate query cancellation on the database side
runner.cancel_query()
raise TimeoutError(f"Query timed out after {timeout_seconds} seconds")
if exception[0]:
raise exception[0]
return result[0], result[1]
Connection Pooling
For applications with high query volume, you might want to maintain a connection pool:
class ConnectionPool:
def __init__(self, db_type, config, pool_size=5):
self.db_type = db_type
self.config = config
self.pool = [None] * pool_size
self.locks = [threading.Lock() for _ in range(pool_size)]
def get_runner(self):
for i, lock in enumerate(self.locks):
if lock.acquire(blocking=False):
try:
if self.pool[i] is None:
self.pool[i] = QueryRunner(self.db_type, self.config)
return i, self.pool[i]
except:
lock.release()
raise
raise RuntimeError("No available connections in the pool")
def release_runner(self, index):
self.locks[index].release()
Cross-Database Queries
For scenarios where you need to query multiple databases:
def aggregate_across_dbs(query, configs):
results = []
for db_type, config in configs.items():
runner = QueryRunner(db_type, config)
data, error = runner.run_query(query, None)
if error:
print(f"Error querying {db_type}: {error}")
else:
results.append((db_type, data))
return results
# Example usage
configs = {
'sqlite': {"dbpath": "local.sqlite"},
'postgresql': {"host": "pg.example.com", "port": 5432, "user": "user",
"password": "pass", "dbname": "db"}
}
results = aggregate_across_dbs("SELECT COUNT(*) AS count FROM users", configs)
Query Annotations
Some query runners support query annotations, which can be useful for query monitoring and performance tracking:
# Enable query annotations in the configuration
config = {
"host": "db.example.com",
"port": 5432,
"user": "user",
"password": "password",
"dbname": "database",
"useQueryAnnotation": True
}
runner = QueryRunner('pg', config)
# The query will be annotated with metadata
metadata = {
"QueryId": "query-123",
"UserId": "user-456",
"Source": "ReportGenerator",
"Scheduled": True
}
runner.run_query("SELECT * FROM reports", None, metadata=metadata)
SSH Tunneling
For databases behind firewalls, you can use SSH tunneling:
import sshtunnel
def query_through_ssh_tunnel(db_config, ssh_config, query):
with sshtunnel.SSHTunnelForwarder(
(ssh_config['host'], ssh_config['port']),
ssh_username=ssh_config['user'],
ssh_password=ssh_config['password'],
remote_bind_address=(db_config['host'], db_config['port']),
local_bind_address=('0.0.0.0', 10000)
) as tunnel:
# Modify the database config to use the tunnel
tunneled_config = db_config.copy()
tunneled_config['host'] = '127.0.0.1'
tunneled_config['port'] = tunnel.local_bind_port
runner = QueryRunner(db_config['type'], tunneled_config)
return runner.run_query(query, None)
Error Handling and Retries
Implement sophisticated error handling with retries for transient errors:
import time
from functools import wraps
def retry_on_transient_errors(max_attempts=3, delay_seconds=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
last_error = None
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
err_str = str(e).lower()
# Check if this looks like a transient error
is_transient = any(s in err_str for s in
['timeout', 'connection', 'temporary',
'overload', 'deadlock'])
if not is_transient:
raise
attempts += 1
last_error = e
if attempts < max_attempts:
time.sleep(delay_seconds)
raise last_error
return wrapper
return decorator
@retry_on_transient_errors()
def execute_query(runner, query):
data, error = runner.run_query(query, None)
if error:
raise Exception(error)
return data