If you are developing blocks on UP42, the UP42 blockutils Python package will make your development faster and your code more reliable. In this Python package, we have included a series of modules that abstract away some block functions so you can focus on delivering your results.
In this blog post, we will be doing a deep dive into 5 of the modules of this package:
blocks
;windows
;datapath
;exceptions
;syntheticimage
.
The UP42 blockutils python package is available via PyPi. In order to install it you can run:
pip install up42-blockutils
You should then be able to access the entire package directly from python:
python -c "import blockutils; help(blockutils)"
Use :q
to exit the help window.
As a starting point, we will use the quickstart processing block (see the code example below). This quickstart block is a processing block that returns the squared input raster values.
from pathlib import Path
import rasterio as rio
from geojson import FeatureCollection
import blockutils
logger = blockutils.logging.get_logger(__name__)
class AProcessingBlock(blockutils.blocks.ProcessingBlock):
def process(
self, input_fc: FeatureCollection)
) -> FeatureCollection:
output_fc = FeatureCollection([])
if not input_fc.features:
raise blockutils.exceptions.UP42Error(
blockutils.exceptions.SupportedErrors.NO_INPUT_ERROR
)
for feat in input_fc["features"]:
logger.info(f"Processing {feat}...")
input_path = Path("/tmp/input/") / Path(blockutils.datapath.get_data_path(feat))
with rio.open(input_path) as src:
src_win = blockutils.windows.WindowsUtil(src)
(
output_name,
output_path,
) = blockutils.datapath.get_output_filename_and_path(
input_path.name, postfix="processed"
)
dst_meta = src.meta.copy()
with rio.open(output_path, "w", **dst_meta) as dst:
for win in src_win.windows_regular():
exp = src.read(window=win) ** 2
dst.write(exp, window=win)
out_feat = Feature(bbox=feat.bbox, geometry=feat.geometry)
out_feat["properties"] = self.get_metadata(feat)
out_feat = blockutils.datapath.set_data_path(out_feat, output_name)
logger.info(f"Processed {out_feat}...")
output_fc.features.append(out_feat)
return output_fc
AProcessingBlock().run()
blocks
: Base Classes Rules
Using object-oriented programming in Python provides ways to extend and adapt code functionalities according to your needs. In the UP42 data science team, when we are building blocks, we define a class that inherits from the base classes provided in the up42-blockutils
package (either ProcessingBlock
or DataBlock
). These base classes provide the elementary functionality to make a block work, e.g. methods to get the user input parameters or run the block. By doing this, we avoid code duplication when it comes to shared methods between blocks. If you want to quickly set up an entire block, check out the data block template and the processing block template.
For instance, all the processing blocks in UP42 can read in the job parameters provided by the user from an environment variable, load the input data.json
file (providing information about intermediate output from previous blocks) and save an output data.json file. The base ProcessingBlock
class offers a method that does just that, the run
method that you can easily use:
from blockutils.block import ProcessingBlock
class MyCustomBlock(ProcessingBlock):
# Fill in methods here
pass
if __name__ == "__main__":
MyCustomBlock().run()
The run method will, as you can see in the reference, create all necessary directories (i.e. /tmp/output/
), load the parameters into a dictionary, load data.json, instantiate the class with the parameters (using the from_dict method of the base class), call the process method of the class with the data.json metadata as input and finally save the resulting data.json into a file in the correct location:
@classmethod
@catch_exceptions()
def run(cls):
"""
This method is the main entry point for the processing block.
"""
ensure_data_directories_exist()
params: dict = load_params()
input_metadata: FeatureCollection = load_metadata()
processing_block = cls.from_dict(params)
result: FeatureCollection = processing_block.process(input_metadata)
save_metadata(result)
The run method is a class method which means that it accepts a class instance as a parameter, in this case returning nothing since all the results are written to disk.
This method is typically the entry point of the block. This means that when a job is run in UP42, the execution of this block is started with a call to the run
method. We usually separate the entry point of the block in a basic run.py
file like this:
import MyCustomBlock
if __name__ == "__main__":
MyCustomBlock().run()
And then you should make sure that the Dockerfile you created for this block uses this file as the entry point:
# Invoke run.py.
CMD ["python", "run.py"]
windows
: Nothing Is Too Big To Chew
One very common issue when processing satellite or aerial acquired data is hitting a memory limit. Sentinel-1 radar images are, for instance, very large (almost 2GB in disk). With most machines you won’t be able to load the entire array into memory at once. That’s exactly why rasterio
offers the windows module, to allow for reading and writing of raster files in chunks.
We have wrapped this functionality in our own windows module for ease of use (see reference) It has been extended to allow for buffered window operations, which is especially useful when the operation to be applied suffers from boundary conditions (e.g. any kernel-based operation).
Window by window writing of imagery with 768 x 768 pixels, written in windows of 368 x 368 pixels.
In the quickstart example, we make use of the windows
module to create a reliable computation, regardless of the size of the input image:
from pathlib import Path
from blockutils.windows import WindowsUtil
input_path = Path("/tmp/input/a_file.tif")
output_path = Path("/tmp/output/a_file_processed.tif")
with rio.open(input_path) as src:
src_win = WindowsUtil(src)
dst_meta = src.meta.copy()
with rio.open(output_path, "w", **dst_meta) as dst:
for win in src_win.windows_regular():
exp = src.read(window=win) ** 2
dst.write(exp, window=win)
datapath
: It’s About The Destination
In UP42, blocks share results between each other via the data.json file. Each data.json file is a FeatureCollection
with each feature representing a single result of a given block. If a block results in an artifact (e.g. a file, set of files, etc) then each feature should have a property - up42.data_path
that points to the location of the results. This is the relative path from the /tmp/output/
folder to the location of the resulting file or set of files.
When building your own custom block, we have some utility functions to help you handle data_path
fetching and setting.
In the quickstart example, we make use of several of these utilities. For example, we fetch the path to the input dataset using get_data_path
method and set it in the input data.json with set_data_path
:
from blockutils.data_path import get_data_path, set_data_path
for feat in features:
input_path = Path("/tmp/input/") / Path(get_data_path(feat))
out_feat = Feature(bbox=feat.bbox, geometry=feat.geometry)
out_feat = set_data_path(out_feat, output_name)
We can also easily get file names and paths from the input file names, for instance:
from pathlib import Path
from blockutils.datapath import get_output_filename_and_path
input_path = Path("/tmp/input/a_file.tif")
(output_name, output_path) = get_output_filename_and_path(
input_path.name, postfix="processed"
)
# Output_name is relative to the `/tmp/output/` folder, pass this to set_data_path
# i.e. a_file_processed.tif
# output_path is the full path, pass this to write function
# i.e. /tmp/output/a_file_processed.tif
You can check out the reference for a full overview of all the datapath
functions.
exceptions
: When Things Go Wrong
If there is one thing that is sure when writing code - at some point it will break! Users will pass wrong parameters, APIs will be down, etc. The important thing is how we handle this in our code. Exception handling can be painful so we included an exceptions
module in our blockutils. This module has three main functions: typifying common errors, standardizing exit codes and simplifying exception handling for your block.
We have typified 5 error types and corresponding exit codes, as defined in the reference:
Error | Description | Exit code |
---|---|---|
INPUT_PARAMETERS_ERROR |
User provided wrong or inconsistent configuration parameters. | 2 |
NO_INPUT_ERROR |
Block did not find input data, e.g. no data in the requested area (data block), no features in data.json (processing block). | 3 |
WRONG_INPUT_ERROR |
Input data is unsuitable, e.g. a processing block expects 16 bit but receives 8 bit. | 4 |
API_CONNECTION_ERROR |
API which is used by the block is down or changed its interface. | 5 |
NO_OUTPUT_ERROR |
After applying all processing steps, no results are provided to the user. | 6 |
In order to effectively use these exception types you need to raise an UP42Error
with the SupportedErrors
as an argument, for instance:
from blockutils.exceptions import UP42Error, SupportedErrors
if parameter_is_wrong:
raise UP42Error(SupportedErrors.INPUT_PARAMETERS_ERROR, "Wrong parameters!")
In addition, you should make sure your block entry point (usually run.py
as described in the section above) uses the catch_exceptions
decorator. Decorators are Python functions that have a function as the input and also return a function. You can use them to wrap functionality around existing code. In this case, the catch_exceptions
decorator makes sure that all exceptions coming from the execution of your code return an appropriate exit code (that is 1 for all generic exceptions, and 2 to 6 for UP42Error
).
For example, you can see in the run
method of the base ProcessingBlock
class that this is already done for you:
from blockutils.exceptions import catch_exceptions
@catch_exceptions()
def run(cls):
"""
This method is the main entry point for the processing block.
"""
pass
The logs will also be more comprehensible for the user. For example, the user would see this in the logs when a NO_OUTPUT_ERROR
is raised:
2020-07-08 13:08:12,471 - blockutils.exceptions - ERROR - No output data created.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/blockutils/exceptions.py", line 111, in wrapper
return function(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/blockutils/blocks.py", line 122, in run
result: FeatureCollection = processing_block.process(input_metadata)
File "/block/src/kmeans_clustering.py", line 180, in process
raise UP42Error(SupportedErrors.NO_OUTPUT_ERROR)
blockutils.exceptions.UP42Error: [NO_OUTPUT_ERROR]
syntheticimage
: Fake It Till You Make It
In the data science team at UP42, we follow a testing-first software development approach and this has helped us ensure consistent code quality among projects and a relatively low number of major bugs. Testing is, however, a pain. We are using Pytest, Mypy and Pylint (Black in addition for formatting), and these tools help tremendously. However, when handling geospatial data, it becomes tricky to test functions end to end without using real data. It’s considered bad practice to check large files into a Git repository since it slows down Git operations, so this can feel like a dead-end street.
For this reason, we recommend using the syntheticimage
module. This module allows you to easily mock raster files without the need to include any sort of clunky TIFF or PNG files in your repository. You can then use these mock files for testing your raster operations.
Synthetic images generated with the
syntheticimage
module using 100, 5000, 1 (from left to right) as seed
.
For instance in our quickstart example, we can easily refactor this so we include the power of 2 mathematical operations in a single method, to make testing easier:
import numpy as np
from blockutils.block import ProcessingBlock
class MyCustomBlock(ProcessingBlock):
@staticmethod
def power_of_two(input: np.ndarray) -> np.ndarray:
return input ** 2
To test this function, let’s set up a test case using the syntheticimage
module:
import tempfile
from blockutils.syntheticimage import SyntheticImage
import MyCustomBlock
def test_power_of_two():
# tempfile context manager ensures all files are deleted after test run
with tempfile.TemporaryDirectory() as tmpdirname:
synth_image = SyntheticImage(128, 128, 4, "uint16", out_dir=Path(tmpdirname))
in_path, in_array = synth_image.create(100)
result_array = MyCustomBlock.power_of_two(in_array)
expected_array = in_array ** 2
assert np.testing.assert_array_almost_equal(result_array, expected_array)
In the reference of the SyntheticImage
class you will find you can easily change nodata values, the position of the image with a custom transform object, or even set a specific image pattern.
Wrapping Up
In this blog post, you have learned about some of the basic modules in the UP42 blockutils Python package. We have shared some minimal examples on how to use some functions and what those functions represent in the UP42 context. The package includes other interesting modules, such as a DIMAP and NETCDF utilities or generic geometry operations, for building blocks.
You can find all the information about up42-blockutils
in the documentation and PyPi page, and all the blocks that have been built by ourselves and our partners on the UP42 marketplace.