L'avantage d'avoir une fonction d'ouverture et de fermeture de la base est qu'on peut plus facilement changer de SGBDR, par exemple de sqlite3 à postgresql.
Ouvrir la base:
import sqlite3 def ouvrebase(basesql, curseur=False): """ouvre la base basesql et renvoie la connexion si curseur=True, renvoie en plus un curseur """ try: cnx = sqlite3.connect(basesql) cnx.execute("PRAGMA foreign_keys=on") # active le foreign key #cnx.text_factory = str # permet de communiquer en utf-8 if curseur: cur = cnx.cursor() except sqlite3.Error, err: #print u"Erreur:", err.args[0] cnx, cur = None, None if curseur: return cnx, cur return cnx
En cas d'erreur, la fonction renvoie une connexion=None (ainsi que le curseur s'il a été demandé). Selon l'application, on peut aussi afficher l'erreur en console ou sous forme graphique. On peut aussi renvoyer une exception (à gérer dans le programme d'appel) en plaçant un 'raise' sous le 'except:'.
La ligne cnx.execute("PRAGMA foreign_keys=on;") appelle 2 remarques:
La ligne #cnx.text_factory = str peut être dé-commentée si on peut pouvoir communiquer (entrées/sorties) avec la base sql en utf-8. Si on laisse en commentaire, la communication est exclusivement en unicode. Rappelons qu'avec Python 2.x, le traitement des chaines encodées 'utf-8' conduit à des erreurs avec les caractères non strictement ASCII: l'unicode est ici nettement préférable.
Exemples d'utilisation: ouverture d'une base existante ou non:
cnx = ouvrebase("base.db3")
cnx, cur = ouvrebase("base.db3", True)
def fermebase(cnx, cur=None): """Ferme la base qui était ouverte avec la connexion cnx ferme aussi le curseur s'il est passé en argument """ if cur != None: cur.close() cnx.close()
Code auto-documenté:
def commit(cnx): """valide ou non la transaction si ok => retourne une chaine unicode vide sinon, retourne le message d'erreur (unicode) """ try: cnx.commit() except sqlite3.Error, err: cnx.rollback() return unicode(err.args[0]) return u""
Attention: une clé primaire impose l'unicité de la donnée, mais accepte par défaut qu'une donnée ne soit pas renseignée (valeur=NULL en sql ou None en Python).
Pour que toutes les valeurs soient obligatoirement renseignées, et donc interdire les valeurs NULL, il faut ajouter 'NOT NULL' à la création de la table. Exemple:
cur.execute(""" CREATE TABLE auteurs ( codeauteur TEXT NOT NULL PRIMARY KEY, commentaire TEXT ); """) cnx.commit()
def nomschamps(cnx, table): """Retourne la liste des titres des colonnes de la table 'table' """ sav = cnx.row_factory # sauvegarde de l'ancienne valeur (None par défaut) cnx.row_factory = sqlite3.Row cur = cnx.cursor() cur.execute("SELECT * FROM %s" % table) r = cur.fetchone() liste = r.keys()# NB: le type de r n'est pas 'list' mais 'sqlite3.Row' cur.close() cnx.row_factory = sav # retour à l'ancienne valeur return liste
Exemple d'utilisation: trouver les noms de champs de la table 'auteurs' (base ouverte):
print nomschamps(cnx, 'auteurs') ['nom', 'prenom', 'pays']
fonction inspirée du Wiki du site sqlite:
def nomstables(cnx): """Retourne la liste des tables de la base en cours (connexion cnx) sous forme d'une sous-liste [nomtable, typetable, codeSQL]: - nomtable: nom de la table - typetable: type de table ('BASE TABLE', 'BASE VIEW', 'TEMPORARY TABLE', 'TEMPORARY VIEW') - codeSQL: code SQL de creation de la table source: wiki du site sqlite """ cur = cnx.cursor() cur.execute(""" DROP VIEW IF EXISTS INFORMATION_SCHEMA_TABLES; """) cur.execute(""" CREATE VIEW INFORMATION_SCHEMA_TABLES AS SELECT 'main' AS TABLE_CATALOG, 'sqlite' AS TABLE_SCHEMA, tbl_name AS TABLE_NAME, CASE WHEN type = 'table' THEN 'BASE TABLE' WHEN type = 'view' THEN 'VIEW' END AS TABLE_TYPE, sql AS TABLE_SOURCE FROM sqlite_master WHERE type IN ('table', 'view') AND tbl_name NOT LIKE 'INFORMATION_SCHEMA_%' ORDER BY TABLE_TYPE, TABLE_NAME; """) cur.execute(""" SELECT * FROM INFORMATION_SCHEMA_TABLES; """) L = cur.fetchall() # on ne retient que les noms des tables non-temporaires et non-système liste = [ligne[2:3][0] for ligne in L if ligne[3]==u'BASE TABLE' or\ ligne[3]==u'BASE VIEW'] cur.close() return liste
Exemple d'utilisation (base ouverte avant):
print nomstables(cnx) [u"colis", u"auteurs", u"photos"]
def liretable(cnx, table): """retourne la liste de tous les enregistrements de la table 'table' """ cur = cnx.cursor() cur.execute("""SELECT * FROM %s;""" % (table,)) liste = cur.fetchall() cur.close() return liste # attention: c'est une 'liste de tuples'
Exemple d'utilisation (base ouverte):
L = liretable(cnx, "auteurs") print L [(u'Dupond', u'Albert', u'France'), (u'Meyer', u'Jean', u'Allemagne'), (u'Toto', u'Louis', u'Italie')]
Soit L une liste d'enregistrements extraite d'une table (liste de tuples ou liste de listes):
[(u'Dupond', u'Albert', u'France'), (u'Meyer', u'Jean', u'Allemagne'), (u'Toto', u'Louis', u'Italie')]
On veut trier ces enregistrements par pays (pays=indice 2):
L.sort(key=lambda v: v[2]
Et comme la méthode sort est stable (ne change pas l'ordre des enregistrements dont le critère de tri est identique), on peut trier selon plusieurs critères successifs. Par exemple, obtenir un tri par pays (pays=indice 2), et pour chaque pays, avoir les noms triés par ordre alphabétique (nom=indice 0), et pour les noms identiques, avoir les prénoms triés par ordre alphabétique:
L.sort(key=lambda v: v[1]) L.sort(key=lambda v: v[0]) L.sort(key=lambda v: v[2])
Et rien n'empêche d'avoir en bibliothèque une liste de clés déjà préparée:
cle0 = lambda v: v[0] cle1 = lambda v: v[1] cle2 = lambda v: v[2] cle3 = lambda v: v[3] cle4 = lambda v: v[4] cle5 = lambda v: v[5] cle6 = lambda v: v[6] cle7 = lambda v: v[7] cle8 = lambda v: v[8] cle9 = lambda v: v[9]
Le tri précédent deviendrait:
L.sort(key=cle1) L.sort(key=cle0) L.sort(key=cle2)
Il s'agit ici de créer une fonction qui va ajouter (INSERT) dans une table un ou plusieurs enregistrements présenté(s) sous forme de listes. Une liste de données pour un seul enregistrement et une liste de listes pour plusieurs.
Voilà le code:
def inseretable(cnx, cur, table, L): """Insère un ou plusieurs enregistrement(s) dans la table cnx est une connexion ouverte, cur est un curseur ouvert table est le nom de la table (chaine) L est un simple enregistrement ou une liste d'enregistrements Les enregistrements doivent avoir la bonne longueur (sinon, echec) Retourne R: liste des enregistrements non passés dans la table avec le message de l'erreur rencontrée format: [[enreg, message_d'erreur], [enreg, message_d'erreur], ...] normalement: R devrait être une liste vide: [] Ne pas oublier un cnx.commit() après, pour validation de l'insertion! """ # si L n'est pas une liste, erreur if not isinstance(L, (list, tuple)): return [[L, u"mauvais format pour insertion dans une table"]] # si L est une liste vide, ne rien faire if len(L)==0: return [] # si L est une simple liste, transformer en liste de listes if not isinstance(L[0], (list, tuple)): L = [L[:]] # convertir la liste simple en liste de listes # exécution de l'insertion R = [] # pour recevoir les enregistrements en echec baserequete = """INSERT INTO "%s" VALUES (""" % (table,) for E in L: try: requete = (baserequete + '?,'*len(E))[:-1] + ");" cur.execute(requete, E) except sqlite3.Error, err: R.append([E, unicode(err.args[0])]) # retourne la liste des enregistrements en échec avec l'erreur rencontrée return R
Exemple d'utilisation:
On a une table formée de 2 champs texte.
On ajoute un seul enregistrement:
# avant, on a: importé sqlite3, crée la table 'table', crée la connexion cnx et le curseur cur enreg = [u"XXXX4", u""] R = inseretable(cnx, cur, 'table', enreg) cnx.commit() if R!=[]: print u"Erreur: ", R[0][1]
On ajoute plusieurs enregistrements:
# avant, on a: importé sqlite3, crée la table 'table', crée la connexion cnx et le curseur cur enreg = [[u"XXXX4", u""],[u"YYYYY", u""],[u"XXXX2", u""]] R = inseretable(cnx, cur, 'table', enreg) cnx.commit() if R!=[]: print u"Erreurs: " for E in R: print E[0], E[1]