import os
from pathlib import Path
import pickle
import datetime
import warnings
import numpy as np
import pynq._3rdparty.tinynumpy as tnp
from ..metadata.runtime_metadata_parser import RuntimeMetadataParser
from pynqmetadata.frontends import Metadata
from .device import Device
from .embedded_device import (
_unify_dictionaries, DEFAULT_XCLBIN, CacheMetadataError,
BitstreamHandler, bit2bin,
ZU_FPD_SLCR_REG, ZU_FPD_SLCR_VALUE, ZU_AXIFM_REG, ZU_AXIFM_VALUE,
)
from .global_state import (
GlobalState,
save_global_state, load_global_state,
global_state_file_exists, clear_global_state,
)
from pynq.remote import (
remote_device_pb2_grpc, mmio_pb2_grpc, buffer_pb2_grpc,
remote_device_pb2, mmio_pb2, buffer_pb2,
)
import grpc
PYNQ_PORT = 7967
BS_FPGA_MAN = "/sys/class/fpga_manager/fpga0/firmware"
BS_FPGA_MAN_FLAGS = "/sys/class/fpga_manager/fpga0/flags"
FIRMWARE = "/lib/firmware/"
[docs]
class GrpcChannel:
"""gRPC Channel wrapper for remote device communication
Establishes and manages a gRPC channel connection to a remote PYNQ device.
Includes connection timeout and error handling.
"""
def __init__(self, ip_addr, port):
"""Initialize gRPC channel connection
Parameters
----------
ip_addr : str
IP address of the remote PYNQ device
port : int
Port number for gRPC communication
"""
try:
self.channel = grpc.insecure_channel(f"{ip_addr}:{port}")
grpc.channel_ready_future(self.channel).result(timeout=5)
except grpc.FutureTimeoutError:
raise RuntimeError("Failed to connect to remote device")
[docs]
class RemoteBitstreamHandler(BitstreamHandler):
"""Remote bitstream handler for processing bitstreams on remote devices
Extends BitstreamHandler to handle bitstream parsing and metadata extraction
for remote PYNQ devices. Uses default XCLBIN data for remote operations.
"""
def __init__(self, filepath):
super().__init__(filepath)
[docs]
def get_parser(self, partial:bool=False):
"""Returns a parser object for the remote bitstream
The returned object contains all of the data that
was processed from both the HWH and Xclbin metadata
attached to the object. For remote devices, uses
default XCLBIN data instead of creating synthetic data.
Parameters
----------
partial : bool, optional
Whether this is a partial reconfiguration bitstream.
Default is False.
Returns
-------
parser : object or None
Parser object containing metadata, or None if parsing fails
"""
from .hwh_parser import HWH
from .xclbin_parser import XclBin
hwh_data = self.get_hwh_data()
xclbin_data = DEFAULT_XCLBIN
is_xsa = self.is_xsa()
self._xsa_bitstream_file = None
if hwh_data is not None and not is_xsa:
if partial:
parser = HWH(hwh_data=hwh_data)
else:
try:
parser = self._get_cache()
except CacheMetadataError:
parser = RuntimeMetadataParser(Metadata(input=self._filepath.with_suffix(".hwh")))
except:
raise RuntimeError(f"Unable to parse metadata")
xclbin_parser = XclBin(xclbin_data=xclbin_data)
_unify_dictionaries(parser, xclbin_parser)
if not partial:
parser.refresh_hierarchy_dict()
elif is_xsa:
parser = RuntimeMetadataParser(Metadata(input=self._filepath))
xclbin_parser = XclBin(xclbin_data=xclbin_data)
_unify_dictionaries(parser, xclbin_parser)
parser.refresh_hierarchy_dict()
self._xsa_bitstream_file = parser.xsa.bitstreamPaths[0]
else:
return None
parser.bin_data = self.get_bin_data()
parser.xclbin_data = xclbin_data
parser.dtbo_data = self.get_dtbo_data()
return parser
[docs]
class RemoteBitfileHandler(RemoteBitstreamHandler):
[docs]
def get_bin_data(self):
bit_data = self._filepath.read_bytes()
return bit2bin(bit_data)
[docs]
class RemoteBinfileHandler(RemoteBitstreamHandler):
[docs]
def get_bin_data(self):
return self._filepath.read_bytes()
[docs]
class RemoteXsafileHandler(RemoteBitstreamHandler):
[docs]
def get_bin_data(self):
if self._xsa_bitstream_file is None:
raise RuntimeError("Could not find bitstream file in XSA")
else:
return bit2bin(Path(self._xsa_bitstream_file).read_bytes())
_bitstream_handlers = {
".bit": RemoteBitfileHandler,
".bin": RemoteBinfileHandler,
".xsa": RemoteXsafileHandler,
}
def _get_bitstream_handler(bitfile_name):
filetype = Path(bitfile_name).suffix
if filetype not in _bitstream_handlers:
raise RuntimeError("Unknown file format")
return _bitstream_handlers[filetype](bitfile_name)
[docs]
class RemoteDevice(Device):
"""Device class for interacting with remote PYNQ devices via gRPC
This device enables control of remote PYNQ boards over network connections.
It provides bitstream downloading, memory allocation, MMIO operations,
and other PYNQ functionality through gRPC remote procedure calls.
The device supports multiple bitstream formats (.bit, .bin, .xsa) and
handles metadata parsing, caching, and AXI port width configuration
remotely.
"""
_probe_priority_ = 100
@classmethod
def _probe_(cls):
if not os.environ.get("PYNQ_REMOTE_DEVICES", False):
return []
else:
ip_env = os.environ.get("PYNQ_REMOTE_DEVICES", "")
ip_list = [ip.strip() for ip in ip_env.split(",") if ip.strip()]
num = len(ip_list)
devices = [RemoteDevice(i, ip_list[i]) for i in range(num)]
return devices
def __init__(self, index, ip_addr, tag="remote{}"):
super().__init__(tag.format(index))
self.name = tag.format(index)
self.ip_addr = ip_addr
self.port = PYNQ_PORT
self.client = GrpcChannel(self.ip_addr, self.port)
self._stub = {
'device': remote_device_pb2_grpc.RemoteDeviceStub(self.client.channel),
'mmio': mmio_pb2_grpc.MmioStub(self.client.channel),
'buffer': buffer_pb2_grpc.RemoteBufferStub(self.client.channel),
}
self.arch = self.get_arch()
self.capabilities = {
"REMOTE": True,
}
[docs]
def get_arch(self):
"""Determine the architecture of the remote device
This method reads "/proc/version" on the target to determine its CPU architecture
It also is used on device object creation to verify the IP address and gRPC
connection to a PYNQ.remote device.
Returns
-------
str
The architecture of the remote device, either "aarch64" or "armv7l"
"""
response = self.exists_file("/proc/version")
if response.exists:
content = self.read_file("/proc/version")
version_info = content.decode('utf-8').strip()
if "aarch64" in version_info:
return "aarch64"
return "armv7l"
else:
raise RuntimeError(f"Unable to obtain architecture information for {self.addr}")
[docs]
def exists_file(self, file_path):
"""Check if a file/directory exists on the remote device
Parameters
----------
file_path : str
The path to the file on the remote device
"""
response = self._stub['device'].existsfile(
remote_device_pb2.ExistsFileRequest(file_path=file_path)
)
return response
[docs]
def read_file(self, file_path, output_as_string=True):
"""Read a file from the remote device
Parameters
----------
file_path : str
The path to the file on the remote device
output_as_string : bool, optional
If True, return the file content as bytes. If False, save the file
locally. Default is True.
Returns
-------
bytes or str
If output_as_string is True, returns the file content as bytes.
If output_as_string is False, returns the local filename where the
file content was saved.
"""
response_stream = self._stub['device'].readfile(
remote_device_pb2.ReadFileRequest(file_path=file_path)
)
if output_as_string:
content = b""
for response in response_stream:
content += response.data
return content
else:
filename = os.path.basename(file_path)
with open(filename, "wb") as f:
for response in response_stream:
f.write(response.data)
return filename
[docs]
def write_file(self, file_path, content):
"""Write content to a file on the remote device in chunks
Parameters
----------
file_path : str
The path to the file on the remote device
content : bytes
The content to be written to the file
"""
CHUNK_SIZE = 2*1024*1024 # 2MB chunks
requests = []
for i in range(0, len(content), CHUNK_SIZE):
chunk = content[i:i+CHUNK_SIZE]
requests.append(
remote_device_pb2.WriteFileRequest(
file_path=file_path,
data=chunk,
)
)
response = self._stub['device'].writefile(iter(requests))
return response
[docs]
def set_axi_port_width(self, parser):
"""Set the AXI port width on the remote device
This method configures AXI port widths to resolve discrepancies between
PS configurations during boot and those required by the bitstream.
It is usually required for full bitstream reconfiguration.
For remote execution, register operations are performed via gRPC calls.
Check https://www.xilinx.com/support/answers/66295.html for more
information on the meaning of register values.
Currently only Zynq UltraScale+ devices support data width changes.
"""
from pynq.registers import Register
if not hasattr(parser, "ps_name"):
# Setting port widths not supported for xclbin-only designs
return
parameter_dict = parser.ip_dict[parser.ps_name]["parameters"]
if parser.family_ps == "zynq_ultra_ps_e":
for para in ZU_FPD_SLCR_REG:
if para in parameter_dict:
width = parameter_dict[para]
for reg_name in ZU_FPD_SLCR_REG[para]:
addr = ZU_FPD_SLCR_REG[para][reg_name]["addr"]
f = ZU_FPD_SLCR_REG[para][reg_name]["field"]
Register(addr)[f[0] : f[1]] = ZU_FPD_SLCR_VALUE[width]
for para in ZU_AXIFM_REG:
if para in parameter_dict:
width = parameter_dict[para]
for reg_name in ZU_AXIFM_REG[para]:
addr = ZU_AXIFM_REG[para][reg_name]["addr"]
f = ZU_AXIFM_REG[para][reg_name]["field"]
Register(addr)[f[0] : f[1]] = ZU_AXIFM_VALUE[width]
[docs]
def gen_cache(self, bitstream, parser=None):
""" Generates the cache of the metadata even if no download occurred """
if not hasattr(parser, "_from_cache"):
t = datetime.datetime.now()
ts = "{}/{}/{} {}:{}:{} +{}".format(
t.year, t.month, t.day, t.hour, t.minute, t.second, t.microsecond
)
if os.path.exists(bitstream.bitfile_name):
gs=GlobalState(bitfile_name=str(bitstream.bitfile_name),
timestamp=ts,
active_name=self.name,
psddr=parser.mem_dict.get("PSDDR", {}))
ip=parser.ip_dict
for sd_name, details in ip.items():
if details["type"] in ["xilinx.com:ip:pr_axi_shutdown_manager:1.0",
"xilinx.com:ip:dfx_axi_shutdown_manager:1.0",]:
gs.add(name=sd_name, addr=details["phys_addr"])
save_global_state(gs)
if hasattr(parser, "systemgraph"):
if not parser.systemgraph is None:
STATE_DIR = os.path.dirname(__file__)
pickle.dump(parser, open(f"{STATE_DIR}/_current_metadata.pkl", "wb"))
[docs]
def mmap(self, address, length):
"""Create memory mapped I/O object for remote device
Parameters
----------
address : int
Base address for memory mapping
length : int
Length of memory region to map
Returns
-------
RemoteMMIO
Memory mapped I/O object for remote access
"""
return RemoteMMIO(self._stub['mmio'], address, length)
[docs]
def download(self, bitstream, parser=None):
"""Download bitstream to the remote FPGA device
This method handles the complete bitstream download process including:
- Transferring bitstream file to remote device
- Configuring FPGA manager flags
- Setting AXI port widths
- Generating metadata cache
Parameters
----------
bitstream : object
Bitstream object containing file and configuration information
parser : object, optional
Metadata parser object. If None, uses default XCLBIN parser.
"""
if parser is None:
from .xclbin_parser import XclBin
parser = XclBin(xclbin_data=DEFAULT_XCLBIN)
if not bitstream.binfile_name:
bitstream.binfile_name = Path(bitstream.bitfile_name).stem + ".bin"
if not bitstream.partial:
self.shutdown()
self.gen_cache(bitstream, parser)
flag = "0"
else:
flag = "1"
response = self._stub['device'].set_bitstream_attrs(
remote_device_pb2.SetBitstreamAttrsRequest(
binfile_name=bitstream.binfile_name,
partial=bitstream.partial,
)
)
if not response:
warnings.warn("Remote device failed to set bitstream attributes")
response = self.exists_file(FIRMWARE + bitstream.binfile_name)
if not response.exists:
self.write_file(FIRMWARE + bitstream.binfile_name, parser.bin_data)
self.write_file(BS_FPGA_MAN_FLAGS, flag.encode())
self.write_file(BS_FPGA_MAN, bitstream.binfile_name.encode())
self.set_axi_port_width(parser)
super().post_download(bitstream, parser, self.name)
[docs]
def initial_global_state_file_boot_check(self):
"""Check and clear global state based on FPGA manager status
Reads the FPGA manager state and flags from the remote device to
determine if global state should be cleared. Currently clears
global state by default for remote devices.
"""
pl_state_file:str = "/sys/class/fpga_manager/fpga0/state"
pl_flags_file:str = "/sys/class/fpga_manager/fpga0/flags"
state = self.read_file(pl_state_file, output_as_string=True).decode('utf-8').strip()
flags = self.read_file(pl_flags_file, output_as_string=True).decode('utf-8').strip()
# if (state[0:7] == "unknown") or flags[0:3] == "100":
# clear_global_state()
# seeing issue where pynq.remote image state is always "operating"
# clearing global state by default for now
clear_global_state()
[docs]
def shutdown(self):
"""Shutdown the AXI connections to the PL remotely
Prepares the programmable logic for reconfiguration by shutting down
AXI connections using shutdown manager IP cores. This prevents
potential issues during bitstream loading.
"""
self.initial_global_state_file_boot_check()
if global_state_file_exists():
gs = load_global_state()
for sd_ip in gs.shutdown_ips.values():
mmio = self.mmap(sd_ip.base_addr, length=4)
# Request shutdown
mmio.write(0x0, 0x1)
i = 0
while mmio.read(0x0) != 0x0F and i < 16000:
i += 1
if i >= 16000:
warnings.warn(
"Timeout for shutdown manager. It's likely "
"the configured bitstream and metadata "
"don't match."
)
[docs]
def allocate(self, shape, dtype, cacheable=0, **kwargs):
"""Allocate memory buffer on the remote device
Parameters
----------
shape : int or tuple of int
Shape of the buffer to allocate
dtype : dtype
Data type of the buffer elements
cacheable : int, optional
Whether buffer should be cacheable (0=non-cacheable, 1=cacheable).
Default is 0.
**kwargs
Additional keyword arguments (currently unused)
"""
elements = 1
try:
for s in shape:
elements *= s
except TypeError:
elements = shape
dtype = np.dtype(dtype)
size = elements * dtype.itemsize
response = self._stub['buffer'].allocate(
buffer_pb2.AllocateRequest(size=size,
dtype=dtype.str,
cacheable=bool(cacheable)
)
)
if not response:
raise RuntimeError(response.msg)
buffer_id = response.buffer_id
ar = RemoteBuffer(
shape,
dtype,
stub=self._stub['buffer'],
buffer_id=buffer_id,
coherent=False,
)
return ar
[docs]
class RemoteGPIO:
"""Remote GPIO placeholder class
Placeholder implementation for GPIO operations on remote devices.
GPIO functionality is not yet implemented for remote PYNQ devices.
"""
def __init__(self, gpio_index=None, direction=None):
"""Initialize RemoteGPIO object
Parameters
----------
gpio_index : int, optional
GPIO pin index number
direction : str, optional
GPIO direction ('in' or 'out')
"""
self.gpio_index = gpio_index
self.direction = direction
warnings.warn("GPIO operations are not yet implemented for remote devices")
[docs]
def read(self):
raise RuntimeError("GPIO operations are not yet implemented for remote devices")
[docs]
def write(self, value):
raise RuntimeError("GPIO operations are not yet implemented for remote devices")
[docs]
def release(self):
"""Release GPIO resources
No-op for remote GPIO placeholder
"""
pass
# Add class methods to match the GPIO API
[docs]
@staticmethod
def get_gpio_pin(gpio_user_index, target_label=None):
"""Get GPIO pin by user index
Placeholder method to prevent attribute errors in remote device context.
"""
warnings.warn("GPIO operations are not yet implemented for remote devices")
return gpio_user_index # Just return the index to prevent errors
[docs]
class RemoteInterrupt:
"""Remote Interrupt placeholder class
Placeholder implementation for interrupt handling on remote devices.
Interrupt functionality is not yet implemented for remote PYNQ devices.
Parameters
----------
fullpath : str, optional
Full path to interrupt device
"""
def __init__(self, fullpath=None):
self.fullpath = fullpath
warnings.warn(f"Interrupts are not yet implemented for remote devices")
[docs]
def wait(self, timeout=None):
raise RuntimeError("Interrupts are not yet implemented for remote devices")
[docs]
class RemoteUioController:
"""Remote UIO Controller placeholder class
Placeholder implementation for UIO (Userspace I/O) operations on remote devices.
UIO functionality is not yet implemented for remote PYNQ devices.
Parameters
----------
device : Device, optional
Device object for UIO operations
"""
def __init__(self, device=None):
self.device = device
warnings.warn("UIO operations are not yet implemented for remote devices")
[docs]
def add_event(self, event, number):
raise RuntimeError("UIO operations are not yet implemented for remote devices")
def __del__(self):
pass
class _AccessHook:
"""Internal access hook for remote MMIO operations
Provides read/write interface between tinynumpy arrays and remote MMIO.
"""
def __init__(self, baseaddress, mmio):
"""Initialize access hook
Parameters
----------
baseaddress : int
Base address for MMIO region
mmio : RemoteMMIO
Remote MMIO object for actual operations
"""
self.baseaddress = baseaddress
self.mmio = mmio
def read(self, offset, length):
"""Read data from remote MMIO
Parameters
----------
offset : int
Offset from base address
length : int
Number of bytes to read
"""
data = self.mmio.read(offset, length)
if isinstance(data, int):
data = data.to_bytes(length, byteorder='little')
return data
def write(self, offset, data):
"""Write data to remote MMIO
Parameters
----------
offset : int
Offset from base address
data : bytes or int
Data to write to remote device
"""
self.mmio.write(offset, data)
[docs]
class RemoteMMIO:
"""Memory-mapped I/O for remote devices via gRPC
Provides memory-mapped I/O operations on remote PYNQ devices through
gRPC remote procedure calls.
"""
def __init__(self, stub, address, length):
"""Initialize RemoteMMIO object
Parameters
----------
stub : grpc stub
gRPC stub for MMIO operations
address : int
Base address for memory-mapped region
length : int
Length of memory-mapped region in bytes
"""
self._stub = stub
response = self._stub.get_mmio(
mmio_pb2.GetMmioRequest(base_addr=address, length=length)
)
if response.msg:
raise RuntimeError(response.msg)
self.mmio_id = response.mmio_id
self._hook = _AccessHook(address, self)
stype = tnp._convert_dtype("u4", to="array")
fake_buffer = tnp._FakeBuffer(length // 4, stype, hook=self._hook)
self.array = tnp.ndarray(shape=(length // 4,), dtype="u4", buffer=fake_buffer)
[docs]
def read(self, offset=0, length=4, word_order="little"):
if length not in [1, 2, 4, 8]:
raise ValueError("MMIO currently only supports 1, 2, 4 and 8-byte reads.")
if offset < 0:
raise ValueError("Offset cannot be negative.")
if length == 8 and word_order not in ["big", "little"]:
raise ValueError("MMIO only supports big and little endian.")
if offset % 4:
raise MemoryError("Unaligned read: offset must be multiple of 4.")
response = self._stub.read(
mmio_pb2.ReadRequest(mmio_id=self.mmio_id, offset=offset,
length=length, word_order=word_order)
)
if response.msg:
raise RuntimeError(response.msg)
return response.data
[docs]
def write(self, offset, data):
if offset < 0:
raise ValueError("Offset cannot be negative.")
if offset % 4:
raise MemoryError("Unaligned write: offset must be multiple of 4.")
if type(data) in [int, np.int32, np.uint32]:
if data < 0:
# Convert data to equivalent unsigned using two's complement for 32 bit
data = data + (1 << 32)
data_bytes = data.to_bytes(4, byteorder='little')
elif type(data)in [np.int64, np.uint64]:
if data < 0:
# Convert data to equivalent unsigned using two's complement for 64 bit
data = data + (1 << 64)
data_bytes = data.to_bytes(8, byteorder='little')
elif isinstance(data, bytes):
data_bytes = data
else:
raise ValueError("Data type must be int or bytes.")
if len(data_bytes) % 4:
raise MemoryError("Unaligned write: data length must be multiple of 4.")
response = self._stub.write(
mmio_pb2.WriteRequest(
mmio_id=self.mmio_id,
offset=offset,
data=data_bytes
)
)
if response.msg:
raise RuntimeError(response.msg)
[docs]
class RemoteBuffer(np.ndarray):
"""NumPy-compatible buffer for remote device memory
Extends numpy.ndarray to provide transparent access to memory buffers
allocated on remote PYNQ devices. Supports all standard numpy operations
while handling data transfer via gRPC automatically.
"""
def __new__(
cls, *args, stub, buffer_id, coherent=False, **kwargs
):
"""Create new RemoteBuffer instance
Parameters
----------
*args
Positional arguments passed to numpy.ndarray
stub : grpc stub
gRPC stub for buffer operations
buffer_id : int
Unique identifier for remote buffer
coherent : bool, optional
Whether buffer is cache coherent. Default is False.
**kwargs
Keyword arguments passed to numpy.ndarray
"""
self = super().__new__(cls, *args, **kwargs)
self.stub = stub
self.buffer_id = buffer_id
self.coherent = coherent
self.freed = False
self.CHUNK_SIZE = 2*1024*1024 # 2MB chunks
return self
def __array_finalize__(self, obj):
if isinstance(obj, RemoteBuffer) and obj.coherent is not None:
self.stub = obj.stub
self.coherent = obj.coherent
self.buffer_id = obj.buffer_id
def __del__(self):
try:
self.freebuffer()
except Exception as e:
pass
def __setitem__(self, key, value):
if key == slice(None, None, None): # equivalent to [:]
self.__set_data(value)
else:
data = self[:]
data[key] = value
self.__set_data(data)
def __getitem__(self, key):
request = buffer_pb2.BufferReadRequest(buffer_id=self.buffer_id)
response_stream = self.stub.read(request)
return self.__get_data(key, response_stream)
def __set_data(self, data):
if not isinstance(data, np.ndarray) or data.dtype != self.dtype:
data = np.asarray(data, dtype=self.dtype)
if data.shape != self.shape:
data = data.reshape(self.shape)
data_bytes = data.tobytes()
total_size = len(data_bytes)
# Create an iterator that yields requests
def request_iterator():
for i in range(0, total_size, self.CHUNK_SIZE):
chunk = data_bytes[i:i+self.CHUNK_SIZE]
yield buffer_pb2.BufferWriteRequest(
buffer_id=self.buffer_id,
data=chunk
)
response = self.stub.write(request_iterator())
if response.msg:
raise RuntimeError(response.msg)
def __get_data(self, key, stream):
result = np.zeros(self.shape, dtype=self.dtype)
offset = 0
for response in stream:
chunk = np.frombuffer(response.data, dtype=self.dtype)
chunk_size = len(chunk)
result.flat[offset:offset+chunk_size] = chunk
offset += chunk_size
return result[key]
def __repr__(self):
return f"RemoteBuffer(buffer_id={self.buffer_id}, dtype={self.dtype}, size={self.size}, shape={self.shape})"
def __str__(self):
data = self[:]
return str(data)
def __len__(self):
return self.size
[docs]
def freebuffer(self):
"""Free the remote buffer memory
Explicitly releases the buffer memory on the remote device.
Called automatically by destructor.
"""
if not self.freed:
self.freed = True
response = self.stub.freebuffer(
buffer_pb2.FreeBufferRequest(buffer_id=self.buffer_id)
)
if response.msg:
raise RuntimeError(response.msg)
@property
def cacheable(self):
return not self.coherent
@property
def physical_address(self):
response = self.stub.physical_address(
buffer_pb2.AddressRequest(buffer_id=self.buffer_id)
)
if response.msg:
raise RuntimeError(response.msg)
return response.address
@property
def device_address(self):
return self.physical_address
@property
def virtual_address(self):
response = self.stub.virtual_address(
buffer_pb2.AddressRequest(buffer_id=self.buffer_id)
)
if response.msg:
raise RuntimeError(response.msg)
return response.address
@property
def nbytes(self):
return int(self.size * self.dtype.itemsize)
[docs]
def flush(self):
if not self.coherent:
response = self.stub.flush(
buffer_pb2.FlushRequest(buffer_id=self.buffer_id)
)
if response.msg:
raise RuntimeError(response.msg)
[docs]
def invalidate(self):
if not self.coherent:
response = self.stub.invalidate(
buffer_pb2.InvalidateRequest(buffer_id=self.buffer_id)
)
if response.msg:
raise RuntimeError(response.msg)
[docs]
def sync_to_device(self):
self.flush()
[docs]
def sync_from_device(self):
self.invalidate()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.freebuffer()
def __array__(self):
return np.array(self[:])