Datalake

Properties


Methods

upload_data

upload_data(
   filepaths: Union[str, Path, list[Union[str, Path]]],
   tags: Optional[list[Union[str, Tag]]] = None, source: Union[str, DataSource,
   None] = None, max_workers: Optional[int] = None,
   error_manager: Optional[ErrorManager] = None, metadata: Union[None, dict,
   list[dict]] = None, fill_metadata: Optional[bool] = False,
   wait_for_unprocessable_data: Optional[bool] = True,
   upload_dir: Optional[str] = None, custom_metadata: Union[None, dict,
   list[dict]] = None
)

Description

Upload data into this datalake.

Upload files representing data, into a datalake. You can give some tags as a list. You can give a source for your data.

If some data fails to upload, check the example to see how to retrieve the list of file paths that failed.

For more information about metadata, check https://documentation.picsellia.com/docs/metadata

Examples

from picsellia.services.error_manager import ErrorManager

source_camera_one = client.get_datasource("camera-one")
source_camera_two = client.get_datasource("camera-two")

lake = client.get_datalake()

tag_car = lake.get_data_tag("car")
tag_huge_car = lake.get_data_tag("huge-car")

lake.upload_data(filepaths=["porsche.png", "ferrari.png"], tags=[tag_car], source=source_camera_one)
lake.upload_data(filepaths="truck.png", tags=[tag_huge_car], source=source_camera_two, metadata={"longitude": 43.6027273, "latitude": 1.4541129}, fill_metadata=True)

error_manager = ErrorManager()
lake.upload_data(filepaths=["twingo.png", "path/unknown.png", error_manager=error_manager)

# This call will return a list of UploadError to see what was wrong
error_paths = [error.path for error in error_manager.errors]

Arguments

  • filepaths (str or Path or list[str or Path]) : Filepaths of your data

  • tags (list[Tag], optional) : Data Tags that will be given to data. Defaults to [].

  • source (DataSource, optional) : Source of your data.

  • max_workers (int, optional) : Number of max workers used to upload. Defaults to os.cpu_count() + 4.

  • error_manager (ErrorManager, optional) : Giving an ErrorManager will allow you to retrieve errors

  • metadata (Dict or list[Dict], optional) : Add some metadata to given data, filepaths length must match this parameter. Defaults to no metadata.

  • fill_metadata (bool, optional) : Whether read exif tags of image and add it into metadata field. If some fields are already given in metadata fields, they will be overridden.

  • wait_for_unprocessable_data (bool, optional) : If true, this method will wait for all data to be fully uploaded and processed by our services. Defaults to true.

  • upload_dir (str, optional) : This parameter can only be used with private object-storages. Specify this parameter to prefix the object name of the data. Filename will still contain a generated uuid4

  • custom_metadata (Dict or list[Dict], optional) : Add custom metadata to given data, filepaths length must match this parameter. Defaults to no custom metadata.

Returns

A Data object or a MultiData object that wraps a list of Data.


find_data

find_data(
   filename: Optional[str] = None, object_name: Optional[str] = None, id: Union[str,
   UUID, None] = None
)

Description

Find a data into this datalake

You can find it by giving its filename or its object name or its id

Examples

my_data = my_datalake.find_data(filename="test.png")

Arguments

  • filename (str, optional) : filename of the data. Defaults to None.

  • object_name (str, optional) : object name in the storage S3. Defaults to None.

  • id (str or UUID, optional) : id of the data. Defaults to None

Raises

If no data match the query, it will raise a NotFoundError. In some case, it can raise an InvalidQueryError, it might be because platform stores 2 data matching this query (for example if filename is duplicated)

Returns

The Data found


list_data

list_data(
   limit: Optional[int] = None, offset: Optional[int] = None,
   page_size: Optional[int] = None, order_by: Optional[list[str]] = None,
   tags: Union[str, Tag, list[Union[str, Tag]], None] = None,
   filenames: Optional[list[str]] = None, intersect_tags: Optional[bool] = False,
   object_names: Optional[list[str]] = None, q: Optional[str] = None,
   ids: Optional[list[Union[str, UUID]]] = None,
   custom_metadata: Optional[dict] = None
)

Description

List data of this datalake.

If there is no data, raise a NoDataError exception.

Returned object is a MultiData. An object that allows manipulation of a bunch of data. You can add tags on them or feed a dataset with them.

Examples

lake = client.get_datalake()
data = lake.list_data()

Arguments

  • limit (int, optional) : if given, will limit the number of data returned

  • offset (int, optional) : if given, will return data that would have been returned after this offset in given order

  • page_size (int, optional) : deprecated.

  • order_by (list[str], optional) : if not empty, will order data by fields given in this parameter

  • filenames (list[str], optional) : if given, will return data that have filename equals to one of given filenames

  • object_names (list[str], optional) : if given, will return data that have object name equals to one of given object names

  • tags (str, Tag, list[Tag or str], optional) : if given, will return data that have one of given tags by default. if intersect_tags is True, it will return data that have all the given tags

  • intersect_tags (bool, optional) : if True, and a list of tags is given, will return data that have all the given tags. Defaults to False.

  • q (str, optional) : if given, will filter data with given query. Defaults to None.

  • ids : (list[UUID]): ids of the data you're looking for. Defaults to None.

  • custom_metadata : (dict, optional): if given, will filter custom metadata of data. Default to None.

Raises

  • NoDataError : When datalake has no data, raise this exception.

Returns

A MultiData object that wraps a list of Data.


create_data_tag

create_data_tag(
   name: str
)

Description

Create a data tag used in this datalake

Examples

tag_car = lake.create_data_tag("car")

Arguments

  • name (str) : Name of the tag to create

Returns

A Tag object


get_data_tag

get_data_tag(
   name: str
)

Description

Retrieve a data tag used in this datalake.

Examples

tag_car = lake.get_data_tag("car")

Arguments

  • name (str) : Name of the tag to retrieve

Returns

A Tag object


get_or_create_data_tag

get_or_create_data_tag(
   name: str
)

Description

Retrieve a data tag used in this datalake by its name. If tag does not exist, create it and return it.

Examples

tag = lake.get_or_create_data_tag("new_tag")

Arguments

  • name (str) : Name of the tag to retrieve or create

Returns

A Tag object


list_data_tags

list_data_tags(
   limit: Optional[int] = None, offset: Optional[int] = None,
   order_by: Optional[list[str]] = None
)

Description

List all tags of this datalake

Examples

tags = lake.list_data_tags()
assert tag_car in tags

Arguments

  • limit (int, optional) : Limit the number of tags returned. Defaults to None.

  • offset (int, optional) : Offset to start listing tags. Defaults to None.

  • order_by (list[str], optional) : Order the tags returned by given fields. Defaults to None.

Returns

A List of Tag


create_projection

create_projection(
   data: Data, name: str, path: str, additional_info: dict = None,
   fill_metadata: bool = False
)

Description

Attach a Projection to an already existing Data. A Projection is another file that will be viewable along the original Data in the UI and in the annotation (if the type is compatible with the web browser). You can add as many Projections to a Data as you want. The type of this projection will be set to 'CUSTOM'

Arguments

  • data Data : target Data.

  • name (str) : projection name.

  • path (str) : path of the file to upload

  • additional_info (dict, optional) : some data to attach to your projection. Defaults to None

  • fill_metadata (bool, optional) : if true, we will read image and add exif metadata to your projection. Defaults to False.

Returns

A DataProjection object


import_bucket_objects

import_bucket_objects(
   prefixes: list[str], tags: Optional[list[Union[str, Tag]]] = None,
   source: Union[str, DataSource, None] = None
)

Description

Asynchronously import files from your remote storage (bucket) into this Datalake Only files with known content-types will be added.

This method takes a list of prefixes. Prefixes can either be full object names or the prefix to a bunch of object names Given tags and source will be added to all imported data. We will read exif of your image to create metadata.

You can only call this method if you use a private object storage with this datalake, owned by your organization. You should use this method carefully as it can import your whole S3 in the platform if you import for example "/"

If you want to import projections from your object storage, or if you want to add custom_metadata, you could instead call import_cloud_objects()

This method will return a Job object, you can call job.wait_for_done() to wait for import. As it might be a long task, we don't wait_for_done() in the method.

Args

  • prefixes : list of prefixes to import

  • tags : list of tags that will be added to data

  • source : data source that will be specified on data

Returns

A Job that you can wait for done.


import_cloud_objects

import_cloud_objects(
   cloud_objects: dict[str, Union[dict, CloudObject]]
)

Description

Asynchronously import files from your bucket into this Datalake. Only files with known content-types will be added.

This method is limited to 500 elements. If you have more elements to import, consider batching and calling this method multiple times.

The keys are the object_names of the data you want to import, the values are CloudObjects that represents all the additional information that needs to be stored against your Data. CloudObject is defined in picsellia.types.schemas, it's a pydantic model, but you can also give a dict, SDK will try to parse it.

CloudObject allows:

  • custom_metadata (dict): a dict of metadata to attach to the created Data
  • tags (list[str]): a list of tag names, the SDK will get or create each name
  • data_source (str): a str, the SDK will get or create source

You can only call this method if you use a private object storage with this datalake, owned by your organization.

This will launch one asynchronous job, it is returned by this method and can be waited

Examples

    from picsellia.types.schemas import CloudObject, CloudProjectionObject
    datalake = client.get_datalake()
    job = datalake.import_cloud_objects(
        cloud_objects={
            "/bucket/path/object-1.jpg": {
                "tags": ["tag-1", "tag-2"],
                "data_source": "cloud",
                "custom_metadata": {"value": 10},
            },
            "/bucket/path/object-2.jpg": CloudObject(
                tags=["tag-1"],
                data_source="cloud",
                custom_metadata={"value": 25},
            ),
        }
    )
    job.wait_for_done()
    datalake.import_cloud_projections(
        cloud_projections={
            "/bucket/path/object-1.jpg": [
                {
                    "name": "view",
                    "object_name": "/bucket/path/object-1-projection.jpg",
                }
            ],
            "/bucket/path/object-2.jpg": [
                    CloudProjectionObject(
                        name="pr1",
                        object_name="/bucket/path/object-2-projection.jpg",
                    )
            ],
        }
    )

Args

  • cloud_objects (dict) : dict with object names as keys and CloudObject as values

Returns

A Job that you can wait for done.


import_cloud_projections

import_cloud_projections(
   cloud_projections: dict[str, list[Union[dict, CloudProjectionObject]]]
)

Description

Asynchronously import files from your bucket as DataProjection into this Datalake. Only files with known content-types will be added.

This method is limited to 500 elements. If you have more elements to import, consider batching and calling this method multiple times.

The keys must be object names of Data that are ALREADY exist in your Datalake. The values are a list of CloudProjectionObjects (or the corresponding dict), that represents a DataProjection. CloudProjectionObject is defined in picsellia.types.schemas, it's a pydantic model, but you can also give a dict, SDK will try to parse it.

CloudProjectionObject must have:

  • name (str): name of your projection
  • object_name (str): path in your bucket of your projection file.

You can only call this method if you use a private object storage with this datalake, owned by your organization.

This will launch one asynchronous job, it is returned by this method and can be waited

Examples

    from picsellia.types.schemas import CloudObject, CloudProjectionObject
    datalake = client.get_datalake()
    datalake.import_cloud_projections(
        cloud_projections={
            "/bucket/path/object-1.jpg": [
                {
                    "name": "view",
                    "object_name": "/bucket/path/object-1-projection.jpg",
                }
            ],
            "/bucket/path/object-2.jpg": [
                CloudProjectionObject(
                    name="pr1",
                    object_name="/bucket/path/object-2-projection.jpg",
                )
            ],
        }
    )

Args

  • cloud_projections (dict) : dict with object names as keys and CloudProjectionObject as values

Returns

A Job that you can wait for done.


launch_processing

launch_processing(
   processing: Processing, data: Union[list[Data], MultiData],
   parameters: dict = None, cpu: int = None, gpu: int = None, model_version_id: UUID = None,
   target_datalake_name: str = None
)

Description

Launch given processing onto this datalake version. You can give specific cpu, gpu or parameters. You can give a model_version_id used by the processing. Constraints defined by the processing will be checked before launching. You can give a target_datalake_name, it will create a Datalake, and the processing will be able to use this as output_datalake

If not given, it will use default values specified in Processing. If processing cannot be launched on this Datalake it will raise before launching.

Examples

processing = client.get_processing("data auto tagging")
data = datalake.list_data(limit=10)
datalake.launch_processing(processing, data)

Returns

A Job object


embeddings_computation_status

embeddings_computation_status()

Description

Return the status of the Visual Search for this Datalake

Returns

a dict with status


visual_search

visual_search(
   data: Data, limit: int
)

Description

Will return a MultiData object with data that are similar to given Data, ordered by score. This is computed with Visual Search feature. Each data will have a temporary attribute _score, if you want to access the similarity score.

Returns

a MultiData object


text_search

text_search(
   query: str, limit: int
)

Description

Will return a MultiData object with data that match your query, ordered by score. This is computed with Visual Search feature. Each data will have a temporary attribute _score, if you want to access the similarity score.

Returns

a MultiData object


count_embeddings

count_embeddings()

Description

Return the number of data indexed by the Visual Search in this Datalake

Returns

number of data indexed


list_embeddings

list_embeddings(
   limit: int
)

Description

Return the list of embeddings computed for this Datalake

Returns

  • id (str) : UUID of the Data
    • vector (dict): Model-specific vector embeddings where:
      • key (str): Embedder identifier
      • value (list): Vector embedding as list of floats

a list of dict with data of indexation each dictionary contains: