Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def get_drivers(cls):

class Client(kcidb.orm.Source):
"""Kernel CI report database client"""
# No, pylint: disable=too-many-public-methods

def __init__(self, database):
"""
Expand Down Expand Up @@ -622,6 +623,29 @@ def oo_query(self, pattern_set):
LOGGER.debug("OO Query: %r", pattern_set)
return self.driver.oo_query(pattern_set)

def load_iter(self, data_iter, with_metadata=False, copy=True):
"""
Load an iterable of datasets into the database,
at least per-table atomically.

Args:
data_iter: The iterable of JSON datasets to load into the
database. Each dataset must adhere to the
database's supported I/O schema version.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
copy: True, if the loaded data should be copied before
packing. False, if the loaded data should be
packed in-place.
"""
assert LIGHT_ASSERTS or self.is_initialized()
assert isinstance(with_metadata, bool)
self.driver.load_iter(data_iter,
with_metadata=with_metadata, copy=copy)

def load(self, data, with_metadata=False, copy=True):
"""
Load data into the database.
Expand All @@ -644,7 +668,7 @@ def load(self, data, with_metadata=False, copy=True):
assert io_schema.is_compatible_directly(data)
assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data)
assert isinstance(with_metadata, bool)
self.driver.load(data, with_metadata=with_metadata, copy=copy)
self.load_iter([data], with_metadata=with_metadata, copy=copy)


class DBHelpAction(argparse.Action):
Expand Down
23 changes: 14 additions & 9 deletions kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,17 @@ def oo_query(self, pattern_set):
assert self.is_initialized()

@abstractmethod
def load(self, data, with_metadata, copy):
def load_iter(self, data_iter, with_metadata, copy):
"""
Load data into the database.
The database must be initialized.
Load an iterable of datasets into the database,
at least per-table atomically.

Args:
data: The JSON data to load into the database. Must
adhere to the I/O version of the database schema.
Will be modified, if "copy" is False.
with_metadata: True if any metadata in the data should
data_iter: The iterable of JSON datasets to load into the
database. Each dataset must adhere to the I/O
version of the database schema, and will be
modified, if "copy" is False.
with_metadata: True if any metadata in the datasets should
also be loaded into the database. False if it
should be discarded and the database should
generate its metadata itself.
Expand All @@ -350,7 +351,11 @@ def load(self, data, with_metadata, copy):
"""
assert self.is_initialized()
io_schema = self.get_schema()[1]
assert io_schema.is_compatible_directly(data)
assert LIGHT_ASSERTS or io_schema.is_valid_exactly(data)
# NOTE: This consumes generators, do not copy as is
assert all(
io_schema.is_compatible_directly(data) and
(LIGHT_ASSERTS or io_schema.is_valid_exactly(data))
for data in data_iter
)
assert isinstance(with_metadata, bool)
assert isinstance(copy, bool)
131 changes: 131 additions & 0 deletions kcidb/db/bigquery/schema.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""
Kernel CI BigQuery report database - miscellaneous schema definitions
"""
import re
from google.cloud.bigquery.schema import SchemaField as Field
from google.protobuf import \
descriptor_pb2, descriptor_pool, message_factory


class JSONInvalidError(Exception):
Expand Down Expand Up @@ -137,3 +140,131 @@ def validate_json_obj_list(field_list, obj_list):
validate_json_obj(field_list, obj)

return obj_list

# You're wrong, pylint: disable=no-member


# The Protobuf field types corresponding to BigQuery field types
PROTOBUF_FIELD_TYPES = {
names[1]: getattr(descriptor_pb2.FieldDescriptorProto, "TYPE_" + names[2])
for names in re.finditer(
r"\s*(\S+)\s+(\S+)\s*\n",
"""
STRING STRING
BYTES BYTES
INTEGER INT64
INT64 INT64
FLOAT DOUBLE
FLOAT64 DOUBLE
BOOLEAN BOOL
BOOL BOOL
TIMESTAMP STRING
DATE STRING
TIME STRING
DATETIME STRING
NUMERIC STRING
BIGNUMERIC STRING
GEOGRAPHY STRING
"""
)
}


def protobuf_msg_desc_fill(desc, field_list):
"""
Fill a protobuf message descriptor with definitions from a (BigQuery
schema) field list.

Args:
desc: The protobuf message descriptor to fill with the field
list definition.
field_list: The list/tuple of BigQuery SchemaField objects to describe
as a message.

Returns:
The filled protobuf message descriptor.
"""
assert isinstance(desc, descriptor_pb2.DescriptorProto)
assert isinstance(field_list, (list, tuple))
assert all(isinstance(field, Field) for field in field_list)

# It's a type, pylint: disable=invalid-name
FDP = descriptor_pb2.FieldDescriptorProto

for i, field in enumerate(field_list, start=1):
field_desc = desc.field.add()
field_desc.name = field.name
field_desc.number = i

if field.mode == "REPEATED":
field_desc.label = FDP.LABEL_REPEATED
else:
field_desc.label = FDP.LABEL_OPTIONAL

if field.field_type.upper() == "RECORD":
protobuf_msg_desc_fill(
desc.nested_type.add(), field.fields
).name = field.name
field_desc.type = FDP.TYPE_MESSAGE
field_desc.type_name = field.name
else:
field_desc.type = PROTOBUF_FIELD_TYPES[field.field_type]
if field.mode != "REPEATED":
field_desc.proto3_optional = True
oneof = desc.oneof_decl.add()
oneof.name = "_" + field.name
field_desc.oneof_index = len(desc.oneof_decl) - 1

return desc


def protobuf_file_desc_create(name, field_list):
"""
Create a protobuf file descriptor from a BigQuery table schema field list.

Args:
name: The name of the object type stored in the table
(singular). Will be used for naming the file
(as <name>.proto) and the message type (literally).
The file will belong to "org.kernelci.bq" package.
field_list: The list of SchemaField objects defining the table schema.

Returns:
The created file descriptor.
"""
assert isinstance(name, str)
assert name.isidentifier()
assert isinstance(field_list, (list, tuple))
assert all(isinstance(field, Field) for field in field_list)

desc = descriptor_pb2.FileDescriptorProto(
package="org.kernelci.bq",
name=name + ".proto",
)
protobuf_msg_desc_fill(desc.message_type.add(), field_list).name = name

return desc


def protobuf_msg_type_create(name, field_list):
"""
Create a protobuf message class for a table with the schema described by a
list of SchemaField objects.

Args:
name: The name of the object type stored in the table
(singular). Will be used for naming the message type.
field_list: The list of SchemaField objects defining the table schema.

Returns:
The created message class.
"""
assert isinstance(name, str)
assert name.isidentifier()
assert isinstance(field_list, (list, tuple))
assert all(isinstance(field, Field) for field in field_list)
pool = descriptor_pool.DescriptorPool()
pool.Add(protobuf_file_desc_create(name, field_list))
return message_factory.GetMessageClass(
pool.FindMessageTypeByName("org.kernelci.bq." + name)
)
Loading
Loading