Skip to main content

Advanced Usage

This guide covers advanced usage patterns and features of Legion Query Runners beyond the basic configuration and queries.

Schema Manipulation

Legion Query Runners provides comprehensive schema access capabilities:

from legion_query_runner.query_runner import QueryRunner

runner = QueryRunner('sqlite', {"dbpath": "database.sqlite"})

# Get full schema
schema = runner.get_schema()
print(f"Found {len(schema)} tables")

for table in schema:
print(f"Table: {table['name']}")
for column in table['columns']:
if isinstance(column, dict):
print(f" - {column['name']} ({column['type']})")
else:
print(f" - {column}")

Type Handling

The query runners automatically map database-specific types to standardized types:

# The result structure contains type information for each column
results = runner.run_query("SELECT * FROM users LIMIT 1")
for column in results['columns']:
print(f"Column: {column['name']}, Type: {column['type']}")

Query Parameterization

Always use parameterized queries to prevent SQL injection:

# Correct approach - use parameters
query = "SELECT * FROM users WHERE id = %s"
results = runner.run_query(query, None, parameters=[5])

# BAD approach - don't do this!
user_id = 5
query = f"SELECT * FROM users WHERE id = {user_id}"

Note: Different databases have different parameter styles:

  • SQLite, PostgreSQL: %s placeholders
  • MySQL: %s placeholders
  • Oracle: :name named parameters
  • SQL Server: ? placeholders
  • BigQuery: Named parameters with @

Query Timeouts and Cancellation

For long-running queries, you can implement timeout handling:

import threading
import time

def execute_with_timeout(runner, query, timeout_seconds=10):
result = [None, None]
exception = [None]

def run_query():
try:
result[0], result[1] = runner.run_query(query, None)
except Exception as e:
exception[0] = e

query_thread = threading.Thread(target=run_query)
query_thread.start()
query_thread.join(timeout_seconds)

if query_thread.is_alive():
# This will initiate query cancellation on the database side
runner.cancel_query()
raise TimeoutError(f"Query timed out after {timeout_seconds} seconds")

if exception[0]:
raise exception[0]

return result[0], result[1]

Connection Pooling

For applications with high query volume, you might want to maintain a connection pool:

class ConnectionPool:
def __init__(self, db_type, config, pool_size=5):
self.db_type = db_type
self.config = config
self.pool = [None] * pool_size
self.locks = [threading.Lock() for _ in range(pool_size)]

def get_runner(self):
for i, lock in enumerate(self.locks):
if lock.acquire(blocking=False):
try:
if self.pool[i] is None:
self.pool[i] = QueryRunner(self.db_type, self.config)
return i, self.pool[i]
except:
lock.release()
raise

raise RuntimeError("No available connections in the pool")

def release_runner(self, index):
self.locks[index].release()

Cross-Database Queries

For scenarios where you need to query multiple databases:

def aggregate_across_dbs(query, configs):
results = []

for db_type, config in configs.items():
runner = QueryRunner(db_type, config)
data, error = runner.run_query(query, None)

if error:
print(f"Error querying {db_type}: {error}")
else:
results.append((db_type, data))

return results

# Example usage
configs = {
'sqlite': {"dbpath": "local.sqlite"},
'postgresql': {"host": "pg.example.com", "port": 5432, "user": "user",
"password": "pass", "dbname": "db"}
}

results = aggregate_across_dbs("SELECT COUNT(*) AS count FROM users", configs)

Query Annotations

Some query runners support query annotations, which can be useful for query monitoring and performance tracking:

# Enable query annotations in the configuration
config = {
"host": "db.example.com",
"port": 5432,
"user": "user",
"password": "password",
"dbname": "database",
"useQueryAnnotation": True
}

runner = QueryRunner('pg', config)

# The query will be annotated with metadata
metadata = {
"QueryId": "query-123",
"UserId": "user-456",
"Source": "ReportGenerator",
"Scheduled": True
}

runner.run_query("SELECT * FROM reports", None, metadata=metadata)

SSH Tunneling

For databases behind firewalls, you can use SSH tunneling:

import sshtunnel

def query_through_ssh_tunnel(db_config, ssh_config, query):
with sshtunnel.SSHTunnelForwarder(
(ssh_config['host'], ssh_config['port']),
ssh_username=ssh_config['user'],
ssh_password=ssh_config['password'],
remote_bind_address=(db_config['host'], db_config['port']),
local_bind_address=('0.0.0.0', 10000)
) as tunnel:
# Modify the database config to use the tunnel
tunneled_config = db_config.copy()
tunneled_config['host'] = '127.0.0.1'
tunneled_config['port'] = tunnel.local_bind_port

runner = QueryRunner(db_config['type'], tunneled_config)
return runner.run_query(query, None)

Error Handling and Retries

Implement sophisticated error handling with retries for transient errors:

import time
from functools import wraps

def retry_on_transient_errors(max_attempts=3, delay_seconds=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
last_error = None

while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
err_str = str(e).lower()
# Check if this looks like a transient error
is_transient = any(s in err_str for s in
['timeout', 'connection', 'temporary',
'overload', 'deadlock'])
if not is_transient:
raise

attempts += 1
last_error = e
if attempts < max_attempts:
time.sleep(delay_seconds)

raise last_error

return wrapper
return decorator

@retry_on_transient_errors()
def execute_query(runner, query):
data, error = runner.run_query(query, None)
if error:
raise Exception(error)
return data