Source code for syft.core.pointer.pointer

"""A Pointer is the main handler when interacting with remote data.
A Pointer object represents an API for interacting with data (of any type)
at a specific location. The pointer should never be instantiated, only subclassed.

The relation between pointers and data is many to one,
there can be multiple pointers pointing to the same piece of data, meanwhile,
a pointer cannot point to multiple data sources.

A pointer is just an object id on a remote location and a set of methods that can be
executed on the remote machine directly on that object. One note that has to be made
is that all operations between pointers will return a pointer, the only way to have access
to the result is by calling .get() on the pointer.

There are two proper ways of receiving a pointer on some data:

1. When sending that data on a remote machine the user receives a pointer.
2. When the user searches for the data in an object store it receives a pointer to that data,
   if it has the correct permissions for that.

After receiving a pointer, one might want to get the data behind the pointer locally. For that the
user should:

1. Request access by calling .request().

Example:

.. code-block::

    pointer_object.request(name = "Request name", reason = "Request reason")

2. The data owner has to approve the request (check the domain node docs).
3. The data user checks if the request has been approved (check the domain node docs).
4. After the request has been approved, the data user can call .get() on the pointer to get the
   data locally.

Example:

.. code-block::

    pointer_object.get()

Pointers are being generated for most types of objects in the data science scene, but what you can
do on them is not the pointers job, see the lib module for more details. One can see the pointer
as a proxy to the actual data, the filtering and the security being applied where the data is being
held.

Example:

.. code-block::

    # creating the data holder domain
    domain_1 = Domain(name="Data holder domain")

    # creating dummy data
    tensor = th.tensor([1, 2, 3])

    # creating the data holder client
    domain_1_client = domain_1.get_root_client()

    # sending the data to the client and receiving a pointer of that data.
    data_ptr_domain_1 = tensor.send(domain_1_client)

    # creating the data user domain
    domain_2 = Domain(name="Data user domain")

    # creating a request to access the data
    data_ptr_domain_1.request(
        name="My Request", reason="I'd lke to see this pointer"
    )

    # getting the remote id of the object
    requested_object = data_ptr_domain_1.id_at_location

    # getting the request id
    message_request_id = domain_1_client.requests.get_request_id_from_object_id(
        object_id=requested_object
    )

    # the data holder accepts the request
    domain_1.requests[0].owner_client_if_available = domain_1_client
    domain_1.requests[0].accept()

    # the data user checks if the data holder approved his request
    response = data_ptr_domain_1.check_access(node=domain_2, request_id=message_request_id)

"""
# stdlib
import time
from typing import Any
from typing import List
from typing import Optional
import warnings

# third party
from google.protobuf.reflection import GeneratedProtocolMessageType
from nacl.signing import VerifyKey

# syft absolute
import syft as sy

# syft relative
from ...logger import debug
from ...logger import error
from ...logger import warning
from ...proto.core.pointer.pointer_pb2 import Pointer as Pointer_PB
from ..common.pointer import AbstractPointer
from ..common.serde.deserialize import _deserialize
from ..common.serde.serializable import bind_protobuf
from ..common.uid import UID
from ..io.address import Address
from ..node.abstract.node import AbstractNode
from ..node.common.action.get_object_action import GetObjectAction
from ..node.common.service.get_repr_service import GetReprMessage
from ..node.common.service.obj_search_permission_service import (
    ObjectSearchPermissionUpdateMessage,
)
from ..store.storeable_object import StorableObject


# TODO: Fix the Client, Address, Location confusion
[docs]@bind_protobuf class Pointer(AbstractPointer): """ The pointer is the handler when interacting with remote data. Automatically generated subclasses of Pointer need to be able to look up the path and name of the object type they point to as a part of serde. For more information on how subclasses are automatically generated, please check the ast module. :param location: The location where the data is being held. :type location: Address :param id_at_location: The UID of the object on the remote location. :type id_at_location: UID """ path_and_name: str _pointable: bool = False def __init__( self, client: Any, id_at_location: Optional[UID] = None, object_type: str = "", tags: Optional[List[str]] = None, description: str = "", ) -> None: super().__init__( client=client, id_at_location=id_at_location, tags=tags, description=description, ) self.object_type = object_type # _exhausted becomes True in get() call # when delete_obj is True and network call # has already been made self._exhausted = False def _get(self, delete_obj: bool = True, verbose: bool = False) -> StorableObject: """Method to download a remote object from a pointer object if you have the right permissions. :return: returns the downloaded data :rtype: StorableObject """ debug( f"> GetObjectAction for id_at_location={self.id_at_location} " + f"with delete_obj={delete_obj}" ) obj_msg = GetObjectAction( id_at_location=self.id_at_location, address=self.client.address, reply_to=self.client.address, delete_obj=delete_obj, ) obj = self.client.send_immediate_msg_with_reply(msg=obj_msg).data if self.is_enum: enum_class = self.client.lib_ast.query(self.path_and_name).object_ref return enum_class(obj) return obj
[docs] def get_copy( self, request_block: bool = False, timeout_secs: int = 20, reason: str = "", verbose: bool = False, ) -> Optional[StorableObject]: """Method to download a remote object from a pointer object if you have the right permissions. Optionally can block while waiting for approval. :return: returns the downloaded data :rtype: Optional[StorableObject] """ return self.get( request_block=request_block, timeout_secs=timeout_secs, reason=reason, delete_obj=False, verbose=verbose, )
def print(self) -> "Pointer": obj = None try: obj_msg = GetReprMessage( id_at_location=self.id_at_location, address=self.client.address, reply_to=self.client.address, ) obj = self.client.send_immediate_msg_with_reply(msg=obj_msg).repr except Exception as e: if "You do not have permission to .get()" in str( e ) or "UnknownPrivateException" in str(e): # syft relative from ..node.domain.service import RequestStatus response_status = self.request( reason="Calling remote print", block=True, timeout_secs=3, ) if ( response_status is not None and response_status == RequestStatus.Accepted ): return self.print() # TODO: Create a remote print interface for objects which displays them in a # nice way, we could also even buffer this between chained ops until we return # so that we can print once and display a nice list of data and ops # issue: https://github.com/OpenMined/PySyft/issues/5167 if obj is not None: print(obj) else: print(f"No permission to print() {self}") return self
[docs] def get( self, request_block: bool = False, timeout_secs: int = 20, reason: str = "", delete_obj: bool = True, verbose: bool = False, ) -> Optional[StorableObject]: """Method to download a remote object from a pointer object if you have the right permissions. Optionally can block while waiting for approval. :return: returns the downloaded data :rtype: Optional[StorableObject] """ # syft relative from ..node.domain.service import RequestStatus if self._exhausted: raise ReferenceError( "Object has already been deleted. This pointer is exhausted" ) if not request_block: result = self._get(delete_obj=delete_obj, verbose=verbose) else: response_status = self.request( reason=reason, block=True, timeout_secs=timeout_secs, verbose=verbose, ) if ( response_status is not None and response_status == RequestStatus.Accepted ): result = self._get(delete_obj=delete_obj, verbose=verbose) else: return None if result is not None and delete_obj: self.gc_enabled = False self._exhausted = True return result
def _object2proto(self) -> Pointer_PB: """Returns a protobuf serialization of self. As a requirement of all objects which inherit from Serializable, this method transforms the current object into the corresponding Protobuf object so that it can be further serialized. :return: returns a protobuf object :rtype: Pointer_PB .. note:: This method is purely an internal method. Please use sy.serialize(object) or one of the other public serialization methods if you wish to serialize an object. """ return Pointer_PB( points_to_object_with_path=self.path_and_name, pointer_name=type(self).__name__, id_at_location=sy.serialize(self.id_at_location), location=sy.serialize(self.client.address), tags=self.tags, description=self.description, object_type=self.object_type, attribute_name=getattr(self, "attribute_name", ""), ) @staticmethod def _proto2object(proto: Pointer_PB) -> "Pointer": """Creates a Pointer from a protobuf As a requirement of all objects which inherit from Serializable, this method transforms a protobuf object into an instance of this class. :return: returns an instance of Pointer :rtype: Pointer .. note:: This method is purely an internal method. Please use syft.deserialize() if you wish to deserialize an object. """ # TODO: we need _proto2object to include a reference to the node doing the # deserialization so that we can convert location into a client object. At present # it is an address object which will cause things to break later. points_to_type = sy.lib_ast.query(proto.points_to_object_with_path) pointer_type = getattr(points_to_type, proto.pointer_name) # WARNING: This is sending a serialized Address back to the constructor # which currently depends on a Client for send_immediate_msg_with_reply return pointer_type( id_at_location=_deserialize(blob=proto.id_at_location), client=_deserialize(blob=proto.location), tags=proto.tags, description=proto.description, object_type=proto.object_type, )
[docs] @staticmethod def get_protobuf_schema() -> GeneratedProtocolMessageType: """Return the type of protobuf object which stores a class of this type As a part of serialization and deserialization, we need the ability to lookup the protobuf object type directly from the object type. This static method allows us to do this. Importantly, this method is also used to create the reverse lookup ability within the metaclass of Serializable. In the metaclass, it calls this method and then it takes whatever type is returned from this method and adds an attribute to it with the type of this class attached to it. See the MetaSerializable class for details. :return: the type of protobuf object which corresponds to this class. :rtype: GeneratedProtocolMessageType """ return Pointer_PB
[docs] def request( self, reason: str = "", block: bool = False, timeout_secs: Optional[int] = None, verbose: bool = False, ) -> Any: """Method that requests access to the data on which the pointer points to. Example: .. code-block:: # data holder domain domain_1 = Domain(name="Data holder") # data tensor = th.tensor([1, 2, 3]) # generating the client for the domain domain_1_client = domain_1.get_root_client() # sending the data and receiving a pointer data_ptr_domain_1 = tensor.send(domain_1_client) # requesting access to the pointer data_ptr_domain_1.request(name="My Request", reason="Research project.") :param name: The title of the request that the data owner is going to see. :type name: str :param reason: The description of the request. This is the reason why you want to have access to the data. :type reason: str .. note:: This method should be used when the remote data associated with the pointer wants to be downloaded locally (or use .get() on the pointer). """ # syft relative from ..node.domain.service import RequestMessage # if you request non-blocking you don't need a timeout # if you request blocking you need a timeout, so lets set a default on here # a timeout of 0 would be a way to say don't block my local notebook but if the # duet partner has a rule configured it will get executed first before the # request would time out if timeout_secs is None and block is False: timeout_secs = -1 # forever msg = RequestMessage( request_description=reason, address=self.client.address, owner_address=self.client.address, object_id=self.id_at_location, object_type=self.object_type, requester_verify_key=self.client.verify_key, timeout_secs=timeout_secs, ) self.client.send_immediate_msg_without_reply(msg=msg) # wait long enough for it to arrive and trigger a handler time.sleep(0.1) if not block: return None else: if timeout_secs is None: timeout_secs = 30 # default if not explicitly set # syft relative from ..node.domain.service import RequestAnswerMessage from ..node.domain.service import RequestStatus output_string = "> Waiting for Blocking Request: " output_string += f" {self.id_at_location}" if len(reason) > 0: output_string += f": {reason}" if len(output_string) > 0 and output_string[-1] != ".": output_string += "." debug(output_string) status = None start = time.time() last_check: float = 0.0 while True: now = time.time() try: # won't run on the first pass because status is None which allows # for remote request handlers to auto respond before timeout if now - start > timeout_secs: log = ( f"\n> Blocking Request Timeout after {timeout_secs} seconds" ) debug(log) return status # only check once every second if now - last_check > 1: last_check = now debug(f"> Sending another Request Message {now - start}") status_msg = RequestAnswerMessage( request_id=msg.id, address=self.client.address, reply_to=self.client.address, ) response = self.client.send_immediate_msg_with_reply( msg=status_msg ) status = response.status if response.status == RequestStatus.Pending: time.sleep(0.1) continue else: # accepted or rejected lets exit status_text = "REJECTED" if status == RequestStatus.Accepted: status_text = "ACCEPTED" log = f" {status_text}" debug(log) return status except Exception as e: error(f"Exception while running blocking request. {e}") # escape the while loop return status
@property def searchable(self) -> bool: msg = "`searchable` is deprecated please use `pointable` in future" warning(msg, print=True) warnings.warn( msg, DeprecationWarning, ) return self._pointable @searchable.setter def searchable(self, value: bool) -> None: msg = "`searchable` is deprecated please use `pointable` in future" warning(msg, print=True) warnings.warn( msg, DeprecationWarning, ) self.pointable = value @property def pointable(self) -> bool: return self._pointable @pointable.setter def pointable(self, value: bool) -> None: if value != self._pointable: self.update_searchability(not self._pointable)
[docs] def update_searchability( self, pointable: bool = True, target_verify_key: Optional[VerifyKey] = None, searchable: Optional[bool] = None, ) -> None: """Make the object pointed at pointable or not for other people. If target_verify_key is not specified, the searchability for the VERIFYALL group will be toggled. :param pointable: If the target object should be made pointable or not. :type target_verify_key: bool :param target_verify_key: The verify_key of the client to which we want to give search permission. :type target_verify_key: Optional[VerifyKey] """ if searchable is not None: warn_msg = "`searchable` is deprecated please use `pointable` in future" warning(warn_msg, print=True) warnings.warn( warn_msg, DeprecationWarning, ) pointable = searchable self._pointable = pointable msg = ObjectSearchPermissionUpdateMessage( add_instead_of_remove=pointable, target_verify_key=target_verify_key, target_object_id=self.id_at_location, address=self.client.address, ) self.client.send_immediate_msg_without_reply(msg=msg)
[docs] def check_access(self, node: AbstractNode, request_id: UID) -> any: # type: ignore """Method that checks the status of an already made request. There are three possible outcomes when requesting access: 1. RequestStatus.Accepted - your request has been approved, you can not .get() your data. 2. RequestStatus.Pending - your request has not been reviewed yet. 3. RequestStatus.Rejected - your request has been rejected. :param node: The node that queries the request status. :type node: AbstractNode :param request_id: The request on which you are querying the status. :type request_id: UID """ # syft relative from ..node.domain.service import RequestAnswerMessage msg = RequestAnswerMessage( request_id=request_id, address=self.client.address, reply_to=node.address ) response = self.client.send_immediate_msg_with_reply(msg=msg) return response.status
def __del__(self) -> None: _client_type = type(self.client) if (_client_type == Address) or issubclass(_client_type, AbstractNode): # it is a serialized pointer that we receive from another client do nothing return if self.gc_enabled: self.client.gc.apply(self)