Skip to content

feat: add support for the Streams API#2519

Open
evertoncolling wants to merge 5 commits intopysdk-release-v8from
v8-just-streams
Open

feat: add support for the Streams API#2519
evertoncolling wants to merge 5 commits intopysdk-release-v8from
v8-just-streams

Conversation

@evertoncolling
Copy link
Contributor

@evertoncolling evertoncolling commented Mar 19, 2026

Description

Adds support for the Streams API in the Data Modeling module, enabling users to create, retrieve, list, and delete streams.

Checklist:

  • Tests added/updated.
  • Documentation updated. Documentation is generated from docstrings - these must be updated according to your change.
    If a new method has been added it should be referenced in cognite.rst in order to generate docs based on its docstring.
  • The PR title follows the Conventional Commit spec.

@codecov
Copy link

codecov bot commented Mar 19, 2026

Codecov Report

❌ Patch coverage is 96.51568% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.46%. Comparing base (36037e2) to head (f6404d1).
⚠️ Report is 3 commits behind head on pysdk-release-v8.

Files with missing lines Patch % Lines
...gnite/client/data_classes/data_modeling/streams.py 95.09% 5 Missing ⚠️
...ntegration/test_api/test_data_modeling/conftest.py 75.00% 4 Missing ⚠️
cognite/client/_api/data_modeling/streams.py 97.22% 1 Missing ⚠️
Additional details and impacted files
@@                 Coverage Diff                  @@
##           pysdk-release-v8    #2519      +/-   ##
====================================================
- Coverage             93.48%   93.46%   -0.03%     
====================================================
  Files                   478      483       +5     
  Lines                 48118    48429     +311     
====================================================
+ Hits                  44983    45262     +279     
- Misses                 3135     3167      +32     
Files with missing lines Coverage Δ
cognite/client/_api/data_modeling/__init__.py 100.00% <100.00%> (ø)
cognite/client/_sync_api/data_modeling/__init__.py 100.00% <100.00%> (ø)
cognite/client/_sync_api/data_modeling/streams.py 100.00% <100.00%> (ø)
...nite/client/data_classes/data_modeling/__init__.py 100.00% <100.00%> (ø)
cognite/client/testing.py 100.00% <100.00%> (ø)
cognite/client/utils/_url.py 100.00% <ø> (ø)
...ration/test_api/test_data_modeling/test_streams.py 100.00% <100.00%> (ø)
...s_unit/test_api/test_data_modeling/test_streams.py 100.00% <100.00%> (ø)
cognite/client/_api/data_modeling/streams.py 97.22% <97.22%> (ø)
...ntegration/test_api/test_data_modeling/conftest.py 86.57% <75.00%> (-1.40%) ⬇️
... and 1 more

... and 6 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@evertoncolling evertoncolling marked this pull request as ready for review March 19, 2026 20:34
@evertoncolling evertoncolling requested review from a team as code owners March 19, 2026 20:34
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Data Modeling module by integrating the Streams API. This new feature allows users to programmatically manage data streams, providing capabilities for creating new streams, fetching existing ones by ID, listing all available streams, and deleting them. The changes encompass both the core asynchronous client and its synchronous wrapper, along with new data structures to represent stream configurations and lifecycle settings, ensuring a complete and robust implementation.

Highlights

  • Streams API Support: Added comprehensive support for the Streams API within the Data Modeling module, enabling users to create, retrieve, list, and delete streams.
  • New Data Classes: Introduced new data classes for Stream objects, settings, lifecycle, and limits to represent stream configurations.
  • Asynchronous and Synchronous Clients: Implemented both asynchronous and synchronous API clients for Streams, ensuring full compatibility and ease of use.
  • Expanded Test Coverage: Included new unit and integration tests to thoroughly validate the functionality of the Streams API operations.
  • Documentation Updates: Updated documentation to reflect the new Streams API endpoints and data models, providing clear guidance for users.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for the Streams API, a new feature in the Data Modeling module. The changes are well-structured, adding asynchronous and synchronous API clients, corresponding data classes, comprehensive unit and integration tests, and documentation. The implementation aligns with the existing SDK patterns. The main issue identified is that the code examples in the asynchronous API's docstrings incorrectly demonstrate synchronous usage. This is a high-severity issue as it can mislead users. I have provided specific suggestions to correct these examples to reflect proper asynchronous patterns.

Comment on lines +48 to +55
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.retrieve(external_id="my_stream")

Retrieve a stream with statistics:

>>> res = client.data_modeling.streams.retrieve(external_id="my_stream", include_statistics=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The code examples for retrieve demonstrate synchronous client usage, but this is an asynchronous method. The examples should use AsyncCogniteClient and await to correctly illustrate asynchronous usage. This pattern of incorrect examples is present for all methods in this file.

Suggested change
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.retrieve(external_id="my_stream")
Retrieve a stream with statistics:
>>> res = client.data_modeling.streams.retrieve(external_id="my_stream", include_statistics=True)
>>> from cognite.client import AsyncCogniteClient
>>> async_client = AsyncCogniteClient()
>>> res = await async_client.data_modeling.streams.retrieve(external_id="my_stream")
Retrieve a stream with statistics:
>>> res = await async_client.data_modeling.streams.retrieve(external_id="my_stream", include_statistics=True)

Comment on lines +82 to +85
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> client.data_modeling.streams.delete(external_id="my_stream")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The code example for delete demonstrates synchronous client usage, but this is an asynchronous method. The example should be updated to use AsyncCogniteClient and await.

Suggested change
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> client.data_modeling.streams.delete(external_id="my_stream")
>>> from cognite.client import AsyncCogniteClient
>>> async_client = AsyncCogniteClient()
>>> await async_client.data_modeling.streams.delete(external_id="my_stream")

Comment on lines +106 to +109
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream_list = client.data_modeling.streams.list()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The code example for list demonstrates synchronous client usage, but this is an asynchronous method. The example should be updated to use AsyncCogniteClient and await.

Suggested change
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream_list = client.data_modeling.streams.list()
>>> from cognite.client import AsyncCogniteClient
>>> async_client = AsyncCogniteClient()
>>> stream_list = await async_client.data_modeling.streams.list()

Comment on lines +138 to +146
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling import StreamWrite, StreamWriteSettings
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream = StreamWrite(
... external_id="my_stream",
... settings=StreamWriteSettings(template_name="ImmutableTestStream")
... )
>>> res = client.data_modeling.streams.create(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The code example for create demonstrates synchronous client usage, but this is an asynchronous method. The example should be updated to use AsyncCogniteClient and await.

Suggested change
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling import StreamWrite, StreamWriteSettings
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream = StreamWrite(
... external_id="my_stream",
... settings=StreamWriteSettings(template_name="ImmutableTestStream")
... )
>>> res = client.data_modeling.streams.create(stream)
>>> from cognite.client import AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling import StreamWrite, StreamWriteSettings
>>> async_client = AsyncCogniteClient()
>>> stream = StreamWrite(
... external_id="my_stream",
... settings=StreamWriteSettings(template_name="ImmutableTestStream")
... )
>>> res = await async_client.data_modeling.streams.create(stream)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant