Python DB-APIとは?
Python DB-API(PEP 249)は、Pythonからデータベースにアクセスするための標準インターフェースです。psycopg2ライブラリを使ってPostgreSQLデータベースに接続し、SQL操作を実行できます。
インストール
psycopg2はPostgreSQL用のDB-APIアダプタです
# psycopg2のインストール
pip install psycopg2-binary
# 仮想環境での使用(推奨)
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install psycopg2-binary
● psycopg2-binary は開発環境向け
● 本番環境では psycopg2 を推奨
データベース接続
import psycopg2
# 接続(基本形式)
conn = psycopg2.connect(
host="localhost",
port="5432",
database="mydb",
user="postgres",
password="password"
)
# カーソル取得
cursor = conn.cursor()
# リソースのクローズ
cursor.close()
conn.close()
● conn はコネクションオブジェクト
● cursor でSQL実行
with文での接続管理
推奨: 自動的にリソースをクリーンアップ
import psycopg2
# with文での接続(推奨)
with psycopg2.connect(
host="localhost",
database="mydb",
user="postgres",
password="password"
) as conn:
with conn.cursor() as cursor:
# SQL実行
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
# 自動的にcommit/close
● 例外発生時も自動クローズ
● commitも自動実行
SELECT(データ取得)
# 全件取得
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall() # すべての行をリストで取得
# 1件取得
cursor.execute("SELECT * FROM users WHERE id = 1")
row = cursor.fetchone() # 1行をタプルで取得
# 指定件数取得
cursor.execute("SELECT * FROM users")
rows = cursor.fetchmany(10) # 10件取得
fetchone()
1行取得
fetchall()
全行取得
パラメータ化クエリ
SQLインジェクション対策: 必ずプレースホルダを使用
# ✅ 正しい方法(プレースホルダ %s を使用)
user_id = 1
cursor.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,) # タプルで渡す
)
# 複数パラメータ
cursor.execute(
"SELECT * FROM users WHERE name = %s AND age > %s",
("Alice", 20)
)
# ❌ 間違った方法(SQLインジェクションの危険)
# cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
● %s をプレースホルダとして使用
● 値は必ずタプルで渡す
トランザクション管理
try:
# トランザクション開始(自動)
cursor.execute("INSERT INTO users VALUES (%s, %s)", (1, "Alice"))
cursor.execute("INSERT INTO orders VALUES (%s, %s)", (101, 1))
# 変更を確定
conn.commit()
except Exception as e:
# エラー時は変更を取り消し
conn.rollback()
print(f"エラー: {e}")
finally:
cursor.close()
conn.close()
commit()
変更を確定
rollback()
変更を取消
CRUD操作
INSERT(追加)
# 1件追加
cursor.execute(
"INSERT INTO users (name, age) VALUES (%s, %s)",
("Alice", 30)
)
conn.commit()
# 複数件追加
data = [
("Bob", 25),
("Carol", 28)
]
cursor.executemany(
"INSERT INTO users (name, age) VALUES (%s, %s)",
data
)
conn.commit()
executemany() で一括登録
UPDATE(更新)
# データ更新
cursor.execute(
"UPDATE users SET age = %s WHERE name = %s",
(31, "Alice")
)
conn.commit()
# 影響を受けた行数を確認
print(f"更新: {cursor.rowcount}行")
rowcount で行数取得
DELETE(削除)
# データ削除
cursor.execute(
"DELETE FROM users WHERE id = %s",
(1,)
)
conn.commit()
# 影響を受けた行数を確認
print(f"削除: {cursor.rowcount}行")
主なエラー(例外)
OperationalError
接続失敗、サーバーエラー
try:
conn = psycopg2.connect(...)
except psycopg2.OperationalError as e:
print(f"接続エラー: {e}")
IntegrityError
制約違反(UNIQUE、外部キー等)
try:
cursor.execute(...)
except psycopg2.IntegrityError as e:
print(f"整合性エラー: {e}")
ProgrammingError
SQL構文エラー、テーブル不存在
try:
cursor.execute("SELCT * FROM users")
except psycopg2.ProgrammingError as e:
print(f"SQL構文エラー: {e}")
DatabaseError
すべてのDB関連エラーの基底クラス
try:
cursor.execute(...)
except psycopg2.DatabaseError as e:
print(f"DBエラー: {e}")
接続プール(Connection Pool)
接続プールは複数の接続を再利用し、パフォーマンスを向上させます(Webアプリ等で有用)
SimpleConnectionPool
from psycopg2 import pool
# プール作成(1〜10接続)
connection_pool = pool.SimpleConnectionPool(
1, 10,
host="localhost",
database="mydb",
user="postgres",
password="password"
)
# 接続取得
conn = connection_pool.getconn()
# 使用後は返却
connection_pool.putconn(conn)
# プールクローズ
connection_pool.closeall()
ThreadedConnectionPool
from psycopg2 import pool
# マルチスレッド対応プール
threaded_pool = pool.ThreadedConnectionPool(
1, 20,
host="localhost",
database="mydb",
user="postgres",
password="password"
)
# 使い方はSimpleConnectionPoolと同じ
conn = threaded_pool.getconn()
# ... 処理 ...
threaded_pool.putconn(conn)
実務で役立つTips
セキュリティ
必ずパラメータ化クエリ(%s)を使用してSQLインジェクションを防止しましょう
リソース管理
with文を使うことで、自動的にコミット・クローズされ、リソースリークを防げます
トランザクション
エラー発生時は必ずrollback()を実行。データの整合性を保ちましょう
接続プール
Webアプリでは接続プールを使い、接続の再利用でパフォーマンスを向上させましょう
データベース操作の基本フロー
SELECT文ではコミット不要。INSERT/UPDATE/DELETEではコミットが必要です
よく使うパターン
辞書形式で結果を取得
import psycopg2.extras
# DictCursorを使用
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
row = cursor.fetchone()
# 辞書形式でアクセス
print(row['name'])
print(row['age'])
一括更新パターン
# 複数レコードを一括更新
updates = [
(31, "Alice"),
(26, "Bob"),
(29, "Carol")
]
cursor.executemany(
"UPDATE users SET age = %s WHERE name = %s",
updates
)
conn.commit()
print(f"{cursor.rowcount}件更新しました")
INSERTして自動採番IDを取得
# RETURNING句で挿入したIDを取得
cursor.execute(
"INSERT INTO users (name, age) VALUES (%s, %s) RETURNING id",
("Dave", 35)
)
new_id = cursor.fetchone()[0]
conn.commit()
print(f"新規ID: {new_id}")
イテレータで大量データを処理
# メモリ効率的な処理
cursor.execute("SELECT * FROM large_table")
# 1行ずつ処理(メモリに優しい)
for row in cursor:
print(row)
# 処理...