-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreate_pc.py
More file actions
358 lines (287 loc) · 14.2 KB
/
create_pc.py
File metadata and controls
358 lines (287 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
"""
Create normalized point clouds from 3D liver meshes for MICCAI 2025.
The exported meshes are centered and scaled to `[-max_norm * weight, max_norm * weight]`
following the HyperDiffusion settings.
Sampled point clouds are saved in `npy` format with occupancies:
+ 1: inside
+ 0: outside
Sampled point cloud categories:
+ 3D_Reconstruction: random cube sample points + nearby surface points
"""
from __future__ import annotations
import argparse
import datetime
import time
from pathlib import Path
import igl
import numpy as np
import pandas as pd
import tqdm
import trimesh
from trimesh.sample import sample_surface
from utils.pathlib_utils import create_folder
### the igl 2.6 doesn' have fast_winding_number_for_meshes, we have to use fast_winding_number instead
### https://github.com/libigl/libigl/issues/2477
### Checking if the fast_winding_number_for_meshes is available
if hasattr(igl, "fast_winding_number_for_meshes"):
fast_winding_number_for_meshes = igl.fast_winding_number_for_meshes
else:
fast_winding_number_for_meshes = igl.fast_winding_number
class PointCloudCreator:
"""Generate point clouds from 3D mesh files."""
def __init__(
self,
obj_suffix: str = "stl",
random_seed: int = 42,
max_norm: float = 0.5,
decay_factor: float = 0.95,
thresh: float = 0.5,
nomalize_point_cloud: bool = True,
) -> None:
"""Initialize the creator.
Args:
obj_suffix (str): Mesh file extension. Defaults to __"stl"__.
random_seed (int): Random seed for reproducibility. Defaults to __42__.
max_norm (float): Range is ``[-max_norm, max_norm]``. Defaults to __0.5__.
decay_factor (float): Optional shrink factor. Defaults to __0.95__.
thresh (float): Winding number threshold. Defaults to __0.5__.
nomalize_point_cloud (bool): Normalize vertices to the range. Defaults to __True__.
"""
### Settings
self.obj_suffix = obj_suffix
self.random_seed = random_seed
self.max_norm = max_norm
self.decay_factor = decay_factor
self.thresh = thresh
self.nomalize_point_cloud = nomalize_point_cloud
### Random generator
self.rng = np.random.default_rng(seed=self.random_seed)
def get_mesh(
self,
obj_path: Path,
nomalize: bool = True,
max_norm: float = 1.0,
weight: float | None = 0.95,
) -> tuple[trimesh.Trimesh, np.ndarray, float] | trimesh.Trimesh:
"""Load and optionally normalize a mesh.
Args:
obj_path (Path): Path to mesh file.
nomalize (bool): Normalize vertices to ``[-max_norm, max_norm]``. Defaults to __True__.
max_norm (float): Target range magnitude. Defaults to __1.0__.
weight (float | None): Additional scale factor. Defaults to __0.95__.
Returns:
tuple[trimesh.Trimesh, np.ndarray, float] | trimesh.Trimesh: Normalized mesh with offset and scale.
"""
### Read the mesh file
obj: trimesh.Trimesh = trimesh.load(obj_path)
if nomalize:
### Normalize the vertices to range [-max_norm, max_norm]
vertices = obj.vertices
### Translate to origin
offset = np.mean(vertices, axis=0, keepdims=True) ### (1, 3)
vertices -= offset
### Normalize to [-1, 1] symmetrically
max_abs = np.max(np.abs(vertices)) ### (1,)
vertices /= max_abs
### Scale to [-max_norm, max_norm]
vertices *= max_norm
### Scale to [-max_norm * weight, max_norm * weight]
if weight is not None:
vertices *= weight
### Update the vertices
obj.vertices = vertices
return obj, offset, max_abs
return obj
def save_point_cloud(
self,
OBJs_path: Path,
save_dir_path: Path,
n_sampled_points: int,
list_obj_names: list[str] | None = None,
) -> None:
"""Create point clouds from meshes and write them to disk.
Args:
OBJs_path (Path): Directory containing mesh files.
save_dir_path (Path): Directory to save generated assets.
n_sampled_points (int): Points sampled per mesh for each category.
list_obj_names (list[str] | None): Mesh basenames to process. Defaults to __None__ for all meshes.
"""
### ------------------------- Create folders ------------------------- ###
mesh_save_path = create_folder("mesh", dir_path=save_dir_path)
npy_save_path = create_folder("npy", dir_path=save_dir_path)
npy3DRec_save_path = create_folder("3D_Reconstruction", dir_path=npy_save_path)
### ------------------------- Write log ------------------------- ###
df_log = pd.DataFrame(
columns=[
"name",
"total_vertices",
"offset",
"scale",
"coor_max_x",
"coor_max_y",
"coor_max_z",
"coor_min_x",
"coor_min_y",
"coor_min_z",
]
)
log_path = save_dir_path / "log.txt"
log_path.unlink(missing_ok=True)
with open(log_path, "w") as log_file:
log_file.write(f"Date: {datetime.datetime.now() }\n")
### Print the arguments in this function
for arg_name, arg_value in locals().items():
if arg_name not in ["self", "log_file"] and not arg_name.startswith("__"):
log_file.write(f"{arg_name}: {arg_value}\n")
log_file.write("\n")
log_file.write(f"### {'-'*30} Process Objects {'-'*30} ###\n")
log_file.write("\n")
### ------------------------- Get data ------------------------- ###
n_points_uniform = n_sampled_points
n_points_surface = n_sampled_points
### Get all obj names if not specified
if list_obj_names is None:
list_obj_names = [obj_path.stem for obj_path in OBJs_path.glob(f"*.{self.obj_suffix}")]
### ------------------------- Process each object ------------------------- ###
start_time = time.time()
for obj_name in tqdm.tqdm(list_obj_names, ncols=100):
start_obj_time = time.time()
### Get obj path
obj_path = OBJs_path / f"{obj_name}.{self.obj_suffix}"
### Get normalized mesh in range [-max_norm, max_norm]
obj, object_offset, object_scale = self.get_mesh(
obj_path, self.nomalize_point_cloud, self.max_norm, self.decay_factor
)
### Write log
vertices = obj.vertices
coor_max = np.max(vertices, axis=0) ### shape (3,) for x, y, z
coor_min = np.min(vertices, axis=0) ### shape (3,) for x, y, z
log_file.write(f"# {obj_path.name}:\n\t{vertices.shape}\n")
log_file.write(f"\tObject offset: {object_offset}, object scale: {object_scale}\n")
log_file.write(f"\tCoordinates (x,y,z) max: {coor_max}, coordinates (x,y,z) min: {coor_min}\n")
df_log.loc[len(df_log)] = {
"name": obj_name,
"total_vertices": vertices.shape[0],
"offset": object_offset,
"scale": object_scale,
"coor_max_x": coor_max[0],
"coor_max_y": coor_max[1],
"coor_max_z": coor_max[2],
"coor_min_x": coor_min[0],
"coor_min_y": coor_min[1],
"coor_min_z": coor_min[2],
}
### ------------------------- Sample points ------------------------- ###
points_uniform = self.rng.uniform(-self.max_norm, self.max_norm, size=(n_points_uniform, 3)) ### (n_points_uniform, 3)
points_surface, _ = sample_surface(obj, n_points_surface, seed=self.random_seed) ### (n_points_surface, 3)
nearby_points_surface = points_surface + 0.01 * self.rng.standard_normal((n_points_surface, 3)) ### (n_points_surface, 3)
### Merge points = (2) + (1)
points = np.concatenate([nearby_points_surface, points_uniform], axis=0) ### (2*n_sampled_points, 3)
log_file.write(f"\tSampled points: {points.shape}\n")
log_file.write(
f"\tSampled points (x,y,z) max: {np.max(points, axis=0)}, Sampled points (x,y,z) min: {np.min(points, axis=0)}\n"
)
### Calculate winding number
inside_surface_values = fast_winding_number_for_meshes(obj.vertices, obj.faces, points) ### (2*n_sampled_points,)
log_file.write(
f"\tInside surface values: {inside_surface_values.shape}, max: {inside_surface_values.max()}, min: {inside_surface_values.min()}\n"
)
### Create labels
occupancies_winding = np.piecewise(
inside_surface_values,
[inside_surface_values < self.thresh, inside_surface_values >= self.thresh],
[0, 1],
)
occupancies = occupancies_winding[..., None] ### (2*n_sampled_points, 1)
log_file.write(
f"\tOccupancies: {occupancies.shape}, max: {occupancies.max()}, min: {occupancies.min()}\n"
)
### ------------------------- Save point clouds and mesh ------------------------- ###
sampled_pointcloud_and_occupancies = np.concatenate((points, occupancies), axis=-1) ### (2*n_sampled_points, 4)
log_file.write(f"\tPoint cloud for 3D Reconstruction: {sampled_pointcloud_and_occupancies.shape}\n")
np.save(npy3DRec_save_path / f"{obj_path.stem}.npy", sampled_pointcloud_and_occupancies)
### Save the normalized mesh to data folder
obj.export(mesh_save_path / obj_path.name)
### ------------------------- Running time ------------------------- ###
process_time = time.time() - start_obj_time
log_file.write(f"\tProcess time: {process_time:.2f} seconds\n")
log_file.write("\n")
### ------------------------- End ------------------------- ###
end_time = time.time()
running_time = datetime.timedelta(seconds=end_time - start_time)
log_file.write(
f"### {'-'*30} Total {len(list_obj_names)} objects; time: {running_time} {'-'*30} ###\n"
)
### Save excel log
df_log.to_csv(save_dir_path / "log.csv", index=False)
def get_cube_points_with_occupancies(self, obj: trimesh.Trimesh, log_file, N: int = 256) -> np.ndarray:
"""Create cube samples with occupancies.
Args:
obj (trimesh.Trimesh): The mesh object.
log_file (TextIOWrapper): File handle for logging.
N (int): Points per dimension. Defaults to __256__.
Returns:
np.ndarray: Cube points with occupancies of shape ``(N**3, 4)``.
"""
points_range = np.linspace(-self.max_norm, self.max_norm, N)
points = np.meshgrid(points_range, points_range, points_range, indexing="ij")
points = np.stack(points, axis=-1).reshape(-1, 3) ### (N^3, 3)
inside_surface_values = fast_winding_number_for_meshes(obj.vertices, obj.faces, points) ### (N^3,)
log_file.write(
f"\tInside surface values of cube points: {inside_surface_values.shape}, max: {inside_surface_values.max()}, min: {inside_surface_values.min()}\n"
)
occupancies_winding = np.piecewise(
inside_surface_values,
[inside_surface_values < self.thresh, inside_surface_values >= self.thresh],
[0, 1],
)
occupancies = occupancies_winding[..., None] ### (N^3, 1)
log_file.write(
f"\tOccupancies of cube points: {occupancies.shape}, max: {occupancies.max()}, min: {occupancies.min()}\n"
)
cube_points_with_occupancies = np.concatenate((points, occupancies), axis=-1) ### (N^3, 4)
return cube_points_with_occupancies
def parse_args() -> argparse.Namespace:
"""Parse CLI arguments for sampling point clouds.
Returns:
argparse.Namespace: Parsed arguments namespace.
"""
parser = argparse.ArgumentParser(description="Sample point clouds from meshes")
parser.add_argument("--objs_path", type=Path, required=True, help="Directory containing meshes (stl/obj)")
parser.add_argument("--save_root", type=Path, required=True, help="Root directory to store outputs")
parser.add_argument("--obj_suffix", type=str, required=True, help="Mesh file extension to read")
parser.add_argument(
"--names_file",
type=Path,
default=None,
help="Optional txt file with mesh names (without extension) to process (one per line)",
)
parser.add_argument(
"--n_sampled_points",
type=int,
default=20_000,
help="Number of points per category per mesh",
)
return parser.parse_args()
def main() -> None:
"""Run sampling based on CLI inputs."""
args = parse_args()
list_obj_names = None
if args.names_file is not None:
with open(args.names_file, "r") as f:
### Get the names of the meshes without extension to make it's consistent with the obj_suffix
list_obj_names = [name.strip().split(".")[0] for name in f.read().splitlines() if name.strip()]
save_dir_name = args.names_file.stem if args.names_file is not None else "all"
save_dir_path = create_folder(
f"sampled_{args.n_sampled_points}_{args.obj_suffix}", dir_path=args.save_root / save_dir_name
)
point_cloud_creator = PointCloudCreator(obj_suffix=args.obj_suffix)
point_cloud_creator.save_point_cloud(
args.objs_path,
save_dir_path,
args.n_sampled_points,
list_obj_names=list_obj_names,
)
print(f"Point cloud sampling finished for {save_dir_path=}, {args.n_sampled_points=:,}")
if __name__ == "__main__":
main()