Thursday, March 15, 2018

Python project - Contactbook

Contact Book: Sample application to perform CRUD operations using Python, MySQL.

             MySQL
             Python 3.x

Libraries used :
           pymsql , beatutifultable

Installation of Libraries using pip
           pip install pymsql
           pip install beautifultable

This application performs basic CRUD orations to demonstrate Python end-to-end application.

Note: Update and delete methods were not called from main, you can give a try...

contact.py


from collections import namedtuple

Contact = namedtuple("Contact", ["cid","name", "email", "mobile"])

config.py


mysql = {
    'host': 'localhost',
    'user': 'root',
    'passwd': 'password',
    'db': 'counselor_db'}

util.py

import pymysql
import config

class DbUtil:

    @staticmethod
    def get_connection():
        try:
            con = pymysql.connect(config.mysql['host'], config.mysql['user'],config.mysql['passwd'],config.mysql['db'])
        except pymysql.MySQLError as error:
            print('Exception number: {}, value {s}'.format(error.args[0], error))
            raise
        else:
            return con

    @staticmethod
    def close_connection(conn):
        try:
            conn.close()
        except pymysql.MySQLError as error:
            print("While closing connection ...")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
            raise

    @staticmethod
    def close_cursor(cursor):
        try:
            cursor.close()
        except pymysql.MySQLError as error:
            print("While closing the cursor ...")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
            raise

dboperations.py

import config
from util import DbUtil
import pymysql
from contact import Contact


class ContactDbOperations:

    def add_contact(self, new_contact):
        try:
            connection = DbUtil.get_connection()
            with connection.cursor() as cursor:
                cursor.execute("insert into contact(name,email,mobile) values(%s,%s,%s)", (new_contact.name
                                                                        , new_contact.email
                                                                                            , new_contact.mobile))
            connection.commit()

        except pymysql.MySQLError as error:
            print("While inserting Data ...")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
        finally:
            DbUtil.close_connection(connection)
            DbUtil.close_cursor(cursor)

    def get_contact(self, cid):
        try:

            conn = DbUtil.get_connection()
            cursor = conn.cursor()
            cursor.execute("select  cid, name, email, mobile from contact where cid = %s", cid)
            contact = Contact(*cursor.fetchone())
            return  contact
        except pymysql.DatabaseError as error:
            print("While getting data from DB using id... ")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
        finally:
            DbUtil.close_connection(conn)
            DbUtil.close_cursor(cursor)

    def update_contact(self, contact):
        try:
            conn = DbUtil.get_connection()
            cursor = conn.cursor()
            cursor.execute("update contact set name = %s ,email = %s, mobile = %s where cid = %s", contact)
            conn.commit()

        except pymysql.DatabaseError as error:
            print("While updating ... ")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
        finally:
            DbUtil.close_connection(conn)
            DbUtil.close_cursor(cursor)

    def get_all_contacts(self):
        try:
            connection = DbUtil.get_connection()
            with connection.cursor() as cursor:
                cursor.execute("select cid, name, email, mobile from contact")
                rows = cursor.fetchall()
                contacts = self.get_list_data(rows)
                return contacts
        except Exception as error:
            print("While retrieving data ... ")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
        finally:
            if connection:
                DbUtil.close_connection(connection)
                DbUtil.close_cursor(cursor)

    def search_contact(self, search_str):
        try:
            connection = DbUtil.get_connection()
            with connection.cursor() as cursor:
                cursor.execute("select cid, name, email, mobile from contact where name like %s ", ('%' + search_str + '%'))
                rows = cursor.fetchall()
                contacts = self.get_list_data(rows)
                return contacts
        except Exception as error:
            print("While searching Data ...")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
        finally:
            DbUtil.close_connection(connection)
            DbUtil.close_cursor(cursor)

    @staticmethod
    def delete_contact(cid):
        try:
            connection = DbUtil.get_connection()
            cursor = connection.cursor()
            cursor.execute("delete from contact where cid = %s ", cid)
            connection.commit()

        except pymysql.DatabaseError as error:
            print("While deleting data ...")
            print('Exception number: {}, value {!r}'.format(error.args[0], error))
        finally:
            DbUtil.close_connection(connection)
            DbUtil.close_cursor(cursor)

    @staticmethod
    def get_list_data(rows):
        return [Contact(*row) for row in rows]

contactservice.py

from dboperations import ContactDbOperations
from contact import Contact
from beautifultable import BeautifulTable

class ContactService:

    def __init__(self):
        self.cs = ContactDbOperations()

    def add_contact(self):
        name = input("Name : ")
        email = input("Email : ")
        mobile = input("Mobile : ")
        contact = Contact(0, name, email, mobile)
        self.cs.add_contact(contact)
        self.show_all_contacts()

    def show_all_contacts(self):
        contacts = self.cs.get_all_contacts()
        ContactService.paint_data_list(contacts)

    def search_contact(self):
        name = input("Enter search name :")
        contacts = self.cs.search_contact(name)
        if contacts:
            ContactService.paint_data_list(contacts)
        else:
            print("No data found with search name :{}".format(name))

    @staticmethod
    def paint_data(data):
        table = BeautifulTable()
        table.column_headers = ["CID", "NAME", "EMAIL", "MOBILE"]
        table.append_row([data.cid, data.name, data.email, data.mobile])
        print(table)

    @staticmethod
    def paint_data_list(data):
        table = BeautifulTable()
        table.column_headers = ["CID", "NAME", "EMAIL", "MOBILE"]
        for li in data:
            table.append_row([li.cid, li.name, li.email, li.mobile])
        print(table)


main.py


if __name__ == '__main__':
    cs = ContactService()
    while True:
        print("*"*40)
        print("1. Add Contact 2. View All 3.Search 4. Exit")
        print("*"*40)
        choice = int(input("Enter your choice :"))
        if choice == 1:
            cs.add_contact()
        elif choice == 2:
            cs.show_all_contacts()
        elif choice == 3:
            cs.search_contact()
        elif choice == 4:
            exit()
        else:
            print("Enter only 1 to 4 ......")

No comments:

Post a Comment

Spring Boot 3 : JWT with SecurityFilterChain, AuthorizeHttpRequests, RequestMatchers

pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0...