@@ -64,102 +64,32 @@ def insert_into_cloud_sql(df, table_name):
6464 db_pass = os .getenv ("DB_PASSWORD" )
6565 db_name = os .getenv ("DB_NAME" )
6666
67- print (f"Attempting to connect to database: { db_name } " )
68- print (f"Instance name: { instance_name } " )
69- print (f"Table name to be created: { table_name } " )
70- print (f"DataFrame shape: { df .shape } " )
71-
67+ print (instance_name ,db_user ,db_name )
68+
7269 connector = Connector ()
7370
7471 def getconn ():
75- try :
76- conn = connector .connect (
77- instance_name ,
78- "pg8000" ,
79- user = db_user ,
80- password = db_pass ,
81- db = db_name ,
82- )
83- print ("Successfully established database connection" )
84- return conn
85- except Exception as e :
86- print (f"Connection error: { str (e )} " )
87- raise
72+ return connector .connect (
73+ instance_name ,
74+ "pg8000" ,
75+ user = db_user ,
76+ password = db_pass ,
77+ db = db_name ,
78+ )
8879
8980 engine = sqlalchemy .create_engine (
9081 "postgresql+pg8000://" ,
9182 creator = getconn ,
9283 )
9384
9485 try :
95- # First verify connection with a simple query
96- with engine .connect () as connection :
97- result = connection .execute (sqlalchemy .text ("SELECT current_database(), current_user" ))
98- db , user = result .fetchone ()
99- print (f"Connected to database: { db } as user: { user } " )
100-
101- print ("Starting data insertion..." )
10286 df .to_sql (table_name , engine , if_exists = 'replace' , index = False )
103-
104- # Verify the table was created
105- with engine .connect () as connection :
106- result = connection .execute (
107- sqlalchemy .text (
108- "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' AND table_name = :table"
109- ),
110- {"table" : table_name }
111- )
112- table_exists = result .scalar ()
113- print (f"Table existence check: { table_exists > 0 } " )
114-
115- if table_exists > 0 :
116- # Get row count
117- result = connection .execute (
118- sqlalchemy .text (f"SELECT COUNT(*) FROM { table_name } " )
119- )
120- row_count = result .scalar ()
121- print (f"Inserted { row_count } rows into table { table_name } " )
122-
123- print (f"Data insertion completed successfully" )
87+ print (f"Data inserted into Cloud SQL table { table_name } ." )
12488 except Exception as e :
125- print (f"Error during database operations: { str (e )} " )
126- raise
127- finally :
128- engine .dispose ()
89+ print (f"Error inserting data into Cloud SQL: { e } " )
12990
13091 return table_name
13192
132- # @task(name="insert_into_cloud_sql")
133- # def insert_into_cloud_sql(df, table_name):
134- # instance_name = os.getenv("CLOUDSQL_CONNECTION_NAME")
135- # db_user = os.getenv("DB_USER")
136- # db_pass = os.getenv("DB_PASSWORD")
137- # db_name = os.getenv("DB_NAME")
138-
139- # connector = Connector()
140-
141- # def getconn():
142- # return connector.connect(
143- # instance_name,
144- # "pg8000",
145- # user=db_user,
146- # password=db_pass,
147- # db=db_name,
148- # )
149-
150- # engine = sqlalchemy.create_engine(
151- # "postgresql+pg8000://",
152- # creator=getconn,
153- # )
154-
155- # try:
156- # df.to_sql(table_name, engine, if_exists='replace', index=False)
157- # print(f"Data inserted into Cloud SQL table {table_name}.")
158- # except Exception as e:
159- # print(f"Error inserting data into Cloud SQL: {e}")
160-
161- # return table_name
162-
16393@task (name = "load_parquet_to_bigquery" )
16494def load_parquet_to_bigquery (gcs_uri , project_id , dataset_id , table_id ):
16595 """Loads Parquet file from GCS into BigQuery."""
0 commit comments