Skip to content

Commit 457bf7e

Browse files
committed
debuggign cloudsql
1 parent 2a1acc6 commit 457bf7e

2 files changed

Lines changed: 204 additions & 168 deletions

File tree

Lines changed: 203 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -1,182 +1,218 @@
11
{
2-
"job_configuration": {
3-
"command": "{{ command }}",
4-
"env": "{{ env }}",
5-
"labels": "{{ labels }}",
6-
"name": "{{ name }}",
7-
"namespace": "{{ namespace }}",
8-
"job_manifest": {
9-
"apiVersion": "batch/v1",
10-
"kind": "Job",
11-
"metadata": {
12-
"labels": "{{ labels }}",
13-
"namespace": "{{ namespace }}",
14-
"generateName": "{{ name }}-"
15-
},
16-
"spec": {
17-
"backoffLimit": 0,
18-
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}",
19-
"template": {
20-
"spec": {
21-
"parallelism": 1,
22-
"completions": 1,
23-
"restartPolicy": "Never",
24-
"serviceAccountName": "{{ service_account_name }}",
25-
"containers": [
26-
{
27-
"name": "prefect-job",
28-
"env": [
29-
{
30-
"name": "GOOGLE_APPLICATION_CREDENTIALS",
31-
"value": "/var/secrets/google/key.json"
2+
"job_configuration": {
3+
"command": "{{ command }}",
4+
"env": "{{ env }}",
5+
"labels": "{{ labels }}",
6+
"name": "{{ name }}",
7+
"namespace": "{{ namespace }}",
8+
"job_manifest": {
9+
"apiVersion": "batch/v1",
10+
"kind": "Job",
11+
"metadata": {
12+
"labels": "{{ labels }}",
13+
"namespace": "{{ namespace }}",
14+
"generateName": "{{ name }}-"
15+
},
16+
"spec": {
17+
"backoffLimit": 0,
18+
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}",
19+
"template": {
20+
"spec": {
21+
"parallelism": 1,
22+
"completions": 1,
23+
"restartPolicy": "Never",
24+
"serviceAccountName": "{{ service_account_name }}",
25+
"containers": [
26+
{
27+
"name": "prefect-job",
28+
"env": [
29+
{
30+
"name": "GOOGLE_APPLICATION_CREDENTIALS",
31+
"value": "/var/secrets/google/key.json"
32+
},
33+
{
34+
"name": "CLOUDSQL_CONNECTION_NAME",
35+
"valueFrom": {
36+
"secretKeyRef": {
37+
"name": "cloudsql-credentials",
38+
"key": "connection_name"
39+
}
3240
}
33-
],
34-
"image": "{{ image }}",
35-
"imagePullPolicy": "{{ image_pull_policy }}",
36-
"args": "{{ command }}",
37-
"volumeMounts": [
38-
{
39-
"name": "gcp-key",
40-
"mountPath": "/var/secrets/google",
41-
"readOnly": true
41+
},
42+
{
43+
"name": "DB_USER",
44+
"valueFrom": {
45+
"secretKeyRef": {
46+
"name": "cloudsql-credentials",
47+
"key": "username"
48+
}
4249
}
43-
]
44-
}
45-
],
46-
"volumes": [
47-
{
48-
"name": "gcp-key",
49-
"secret": {
50-
"secretName": "gcp-sa-key"
50+
},
51+
{
52+
"name": "DB_PASSWORD",
53+
"valueFrom": {
54+
"secretKeyRef": {
55+
"name": "cloudsql-credentials",
56+
"key": "password"
57+
}
58+
}
59+
},
60+
{
61+
"name": "DB_NAME",
62+
"valueFrom": {
63+
"secretKeyRef": {
64+
"name": "cloudsql-credentials",
65+
"key": "database"
66+
}
67+
}
68+
}
69+
],
70+
"image": "{{ image }}",
71+
"imagePullPolicy": "{{ image_pull_policy }}",
72+
"args": "{{ command }}",
73+
"volumeMounts": [
74+
{
75+
"name": "gcp-key",
76+
"mountPath": "/var/secrets/google",
77+
"readOnly": true
5178
}
79+
]
80+
}
81+
],
82+
"volumes": [
83+
{
84+
"name": "gcp-key",
85+
"secret": {
86+
"secretName": "gcp-sa-key"
5287
}
53-
]
54-
}
88+
}
89+
]
5590
}
5691
}
57-
},
58-
"cluster_config": "{{ cluster_config }}",
59-
"job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
60-
"pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}",
61-
"stream_output": "{{ stream_output }}"
92+
}
6293
},
63-
"variables": {
64-
"description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template.",
65-
"type": "object",
66-
"properties": {
67-
"name": {
68-
"title": "Name",
69-
"description": "Name given to infrastructure created by a worker.",
70-
"type": "string"
71-
},
72-
"env": {
73-
"title": "Environment Variables",
74-
"description": "Environment variables to set when starting a flow run.",
75-
"type": "object",
76-
"additionalProperties": {
77-
"type": "string"
78-
}
79-
},
80-
"labels": {
81-
"title": "Labels",
82-
"description": "Labels applied to infrastructure created by a worker.",
83-
"type": "object",
84-
"additionalProperties": {
85-
"type": "string"
86-
}
87-
},
88-
"command": {
89-
"title": "Command",
90-
"description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.",
91-
"type": "string"
92-
},
93-
"namespace": {
94-
"title": "Namespace",
95-
"description": "The Kubernetes namespace to create jobs within.",
96-
"default": "data-pipeline",
94+
"cluster_config": "{{ cluster_config }}",
95+
"job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
96+
"pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}",
97+
"stream_output": "{{ stream_output }}"
98+
},
99+
"variables": {
100+
"description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template.",
101+
"type": "object",
102+
"properties": {
103+
"name": {
104+
"title": "Name",
105+
"description": "Name given to infrastructure created by a worker.",
106+
"type": "string"
107+
},
108+
"env": {
109+
"title": "Environment Variables",
110+
"description": "Environment variables to set when starting a flow run.",
111+
"type": "object",
112+
"additionalProperties": {
97113
"type": "string"
98-
},
99-
"image": {
100-
"title": "Image",
101-
"description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.",
102-
"example": "docker.io/prefecthq/prefect:2-latest",
103-
"type": "string",
104-
"default": "gcr.io/teak-gamma-442315-f8/taxi-flow:latest"
105-
},
106-
"service_account_name": {
107-
"title": "Service Account Name",
108-
"description": "The Kubernetes service account to use for job creation.",
109-
"type": "string",
110-
"default": "prefect-worker"
111-
},
112-
"image_pull_policy": {
113-
"title": "Image Pull Policy",
114-
"description": "The Kubernetes image pull policy to use for job containers.",
115-
"default": "Always",
116-
"enum": [
117-
"IfNotPresent",
118-
"Always",
119-
"Never"
120-
],
114+
}
115+
},
116+
"labels": {
117+
"title": "Labels",
118+
"description": "Labels applied to infrastructure created by a worker.",
119+
"type": "object",
120+
"additionalProperties": {
121121
"type": "string"
122-
},
123-
"finished_job_ttl": {
124-
"title": "Finished Job TTL",
125-
"description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.",
126-
"type": "integer"
127-
},
128-
"job_watch_timeout_seconds": {
129-
"title": "Job Watch Timeout Seconds",
130-
"description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.",
131-
"type": "integer"
132-
},
133-
"pod_watch_timeout_seconds": {
134-
"title": "Pod Watch Timeout Seconds",
135-
"description": "Number of seconds to watch for pod creation before timing out.",
136-
"default": 60,
137-
"type": "integer"
138-
},
139-
"stream_output": {
140-
"title": "Stream Output",
141-
"description": "If set, output will be streamed from the job to local standard output.",
142-
"default": true,
143-
"type": "boolean"
144-
},
145-
"cluster_config": {
146-
"title": "Cluster Config",
147-
"description": "The Kubernetes cluster config to use for job creation.",
148-
"allOf": [
149-
{
150-
"$ref": "#/definitions/KubernetesClusterConfig"
151-
}
152-
]
153122
}
154123
},
155-
"definitions": {
156-
"KubernetesClusterConfig": {
157-
"title": "KubernetesClusterConfig",
158-
"description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
159-
"type": "object",
160-
"properties": {
161-
"config": {
162-
"title": "Config",
163-
"description": "The entire contents of a kubectl config file.",
164-
"type": "object"
165-
},
166-
"context_name": {
167-
"title": "Context Name",
168-
"description": "The name of the kubectl context to use.",
169-
"type": "string"
170-
}
124+
"command": {
125+
"title": "Command",
126+
"description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.",
127+
"type": "string"
128+
},
129+
"namespace": {
130+
"title": "Namespace",
131+
"description": "The Kubernetes namespace to create jobs within.",
132+
"default": "data-pipeline",
133+
"type": "string"
134+
},
135+
"image": {
136+
"title": "Image",
137+
"description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.",
138+
"example": "docker.io/prefecthq/prefect:2-latest",
139+
"type": "string",
140+
"default": "gcr.io/teak-gamma-442315-f8/taxi-flow:latest"
141+
},
142+
"service_account_name": {
143+
"title": "Service Account Name",
144+
"description": "The Kubernetes service account to use for job creation.",
145+
"type": "string",
146+
"default": "prefect-worker"
147+
},
148+
"image_pull_policy": {
149+
"title": "Image Pull Policy",
150+
"description": "The Kubernetes image pull policy to use for job containers.",
151+
"default": "Always",
152+
"enum": [
153+
"IfNotPresent",
154+
"Always",
155+
"Never"
156+
],
157+
"type": "string"
158+
},
159+
"finished_job_ttl": {
160+
"title": "Finished Job TTL",
161+
"description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.",
162+
"type": "integer"
163+
},
164+
"job_watch_timeout_seconds": {
165+
"title": "Job Watch Timeout Seconds",
166+
"description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.",
167+
"type": "integer"
168+
},
169+
"pod_watch_timeout_seconds": {
170+
"title": "Pod Watch Timeout Seconds",
171+
"description": "Number of seconds to watch for pod creation before timing out.",
172+
"default": 60,
173+
"type": "integer"
174+
},
175+
"stream_output": {
176+
"title": "Stream Output",
177+
"description": "If set, output will be streamed from the job to local standard output.",
178+
"default": true,
179+
"type": "boolean"
180+
},
181+
"cluster_config": {
182+
"title": "Cluster Config",
183+
"description": "The Kubernetes cluster config to use for job creation.",
184+
"allOf": [
185+
{
186+
"$ref": "#/definitions/KubernetesClusterConfig"
187+
}
188+
]
189+
}
190+
},
191+
"definitions": {
192+
"KubernetesClusterConfig": {
193+
"title": "KubernetesClusterConfig",
194+
"description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
195+
"type": "object",
196+
"properties": {
197+
"config": {
198+
"title": "Config",
199+
"description": "The entire contents of a kubectl config file.",
200+
"type": "object"
171201
},
172-
"required": [
173-
"config",
174-
"context_name"
175-
],
176-
"block_type_slug": "kubernetes-cluster-config",
177-
"secret_fields": [],
178-
"block_schema_references": {}
179-
}
202+
"context_name": {
203+
"title": "Context Name",
204+
"description": "The name of the kubectl context to use.",
205+
"type": "string"
206+
}
207+
},
208+
"required": [
209+
"config",
210+
"context_name"
211+
],
212+
"block_type_slug": "kubernetes-cluster-config",
213+
"secret_fields": [],
214+
"block_schema_references": {}
180215
}
181216
}
182-
}
217+
}
218+
}

pipeline-project/src/processing/flows/taxi_data_flow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def getconn():
8181
)
8282

8383
try:
84-
df.to_sql(table_name, engine, if_exists='replace', index=False)
84+
df.to_sql(table_name, engine, if_exists='replace', index=False, chunksize=1000)
8585
print(f"Data inserted into Cloud SQL table {table_name}.")
8686
except Exception as e:
8787
print(f"Error inserting data into Cloud SQL: {e}")

0 commit comments

Comments
 (0)