Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable large tests #117

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ To add functionality, implement one of these abstract classes.
The runnable is the simplest form of a pipeline element. While it is the building block of modules, the user can implement Runnables that are run by modules (such as `NDVI`).

#### Module
The module is a logical part of the image processing pipeline, chained sequentially with other pipelines. A module will perform its functionality when being `run()` and save the relevant data in the pipeline data object that will be passed to the following module. Note that your implementation should invoke `super().run(data)` after your module logic.
The module is a logical part of the image processing pipeline, chained sequentially with other pipelines. A module will perform its functionality when being `run()` and save the relevant data in the pipeline data object that will be passed to the following module by the pipeline.

#### Parallel module
The parallel module is a module that can run multiple threads of execution at the same time, essentially allowing parallel module invocations. Parallel modules implement logical groups of functionalities, such as the calculation of all indicies (e.g. `NDVI` and `Mositure`) that do not rely on each other.
Expand Down
2 changes: 1 addition & 1 deletion src/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def main(args: Any):
imgs = imgs[:4]

# Run the pipeline
pipeline = default_pipeline(cloud=cloud_config)
pipeline = nutrient_pipeline()
pipeline.show()

# Authenticate to firebase
Expand Down
3 changes: 0 additions & 3 deletions src/backend/pipeline/modules/index/index.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# type: ignore
from __future__ import annotations
from ..parallel_module import ParallelModule
from .runnables.ndvi import NDVI
from .runnables.nutrient import Nutrient
from ..runnable import Runnable
from ..data import Data, Modules
from typing import Any
Expand Down
7 changes: 4 additions & 3 deletions src/backend/pipeline/modules/index/runnables/ndvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from ..indicies import Indicies
from ....mat import Channels
import numpy as np
import cv2
import traceback
import matplotlib.pyplot as plt
import sys

class NDVI(Runnable):
"""
Expand Down Expand Up @@ -44,8 +45,8 @@ def run(self, data: Data) -> bool:

# Catch exception
except Exception as exception:
print("NDVI calculation failed!")
print(exception)
print("Running NDVI failed: " + str(exception))
print(traceback.format_exc())
return False

def calculate(self, nir, red) -> np.ndarray:
Expand Down
17 changes: 11 additions & 6 deletions src/backend/pipeline/modules/index/runnables/nutrient.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,41 @@
from ...runnable import Runnable
from ...data import Data, Modules
from ..indicies import Indicies
from ....mat import Channels
import tensorflow as tf
import traceback
import numpy as np
import matplotlib.pyplot as plt
import cv2
import sys

class Nutrient(Runnable):
"""
Nutrient deficiency runnable.
"""

def __init__(self, data: Data) -> None:
super().__init__(data, name="NUTRIENT")
super().__init__(data, name="Nutrient")
self.type = Indicies.NUTRIENT

def run(self, data: Data) -> bool:
try:
# take the calculated masks from the segmentation module
masks = data.modules[Modules.SEGMENTATION.value]["masks"]
arg_masks = [tf.one_hot(tf.math.argmax(mask, axis=3), depth=9) for mask in masks]
# assume index 0 is for the nutrient deficiency mask
nutrient_masks = [mask[0][:, :, 0] for mask in masks]
nutrient_masks = [mask[0][:, :, 0] for mask in arg_masks]
data.modules[Modules.INDEX.value]["runnables"][self.type.value]["masks"] = nutrient_masks
masks = [np.where(mask == 1, 255, 0) for mask in nutrient_masks]
patches = data.modules[Modules.MOSAIC.value]["patches"]
hsize, _ = data.modules[Modules.MOSAIC.value]["patches_dims"]
hsize = 512
result = self.calculate(masks, patches, hsize)
data.modules[Modules.INDEX.value]["runnables"][self.type.value]["index"] = result
return True

# Catch exception
except Exception as exception:
print("Nutrient calculation failed!")
print(exception)
print("Running Nutrient failed: " + str(exception))
print(traceback.format_exc())
return False

def calculate(self, masks, patches, hsize: int) -> np.ndarray:
Expand Down
20 changes: 5 additions & 15 deletions src/backend/pipeline/modules/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from typing import Any
import pickle
import pydash
from google.cloud import storage
from firebase_admin import firestore
import json
from abc import abstractmethod

class Module(Runnable):
"""
Expand All @@ -35,25 +35,15 @@ def __init__(self, data: Data, channels: list[Channels] = [], input_data: Any =
self.next: Module | None = None
self.type: Modules = module_type

def run(self, data: Data) -> Any:
@abstractmethod
def run(self, data: Data) -> None:
"""
Processes the image.

Args:
data: the job data
persist: whether to save the images to the field database
data: the pipeline data object
"""

# If there is a next module, then run it
if self.next is not None:
print(f"Preparing <{self.name}>")
data.current = self.next.type
self.next.prepare(data)
print(f"Running <{self.name}>")

# Otherwise, return the data
return data

def display(self, img: Mat) -> None:
"""
Downscales and displays an image to fit your monitor.
Expand Down
8 changes: 1 addition & 7 deletions src/backend/pipeline/modules/mosaicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Mosaicing(Module):
def __init__(self, data: Data, input_data: Any = None) -> None:
super().__init__(data, name="Mosaicing", module_type=Modules.MOSAIC)

def run(self, data: Data) -> Data:
def run(self, data: Data) -> None:
"""
Stitches the images to create an orthomosaic image of the farm.

Expand All @@ -26,9 +26,6 @@ def run(self, data: Data) -> Data:

Raises:
Exception: when the sticher fails to stich the images

Returns:
The stiched image.
"""

self.prepare(data)
Expand Down Expand Up @@ -62,9 +59,6 @@ def run(self, data: Data) -> Data:
data.modules[self.type.value]["alpha_img"] = alpha_stitched
patches = self.create_patches(stitched, data.input[0].channels)
data.modules[self.type.value]["patches"] = patches

# Run the next module
return super().run(data)

def to_persist(self, data: Data):
data.persistable[self.type.value] = frozenset([self.type.value + "." + "stitched"])
Expand Down
17 changes: 9 additions & 8 deletions src/backend/pipeline/modules/parallel_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from .modules import Modules
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Any, Type


class ParallelModule(Module):
"""
Represents an arbitrary image processing pipeline module that can
Expand All @@ -33,29 +35,24 @@ def __init__(self, data: Data, runnables: list[Type[Runnable]], input_data: Any
self.runnables: list[Runnable] = [runnable(data) for runnable in runnables]
self.channels: list[Channels] = list(set(sum([runnable.channels for runnable in self.runnables], [])))

def run(self, data: Data) -> Data:
def run(self, data: Data) -> None:
"""
Processes the image using the runnables.

Args:
img: the input image(s)
data: the job data
data: the pipeline data object
"""

# Spawn the executor
# TODO: don't hardcode max_workers
with ThreadPoolExecutor(max_workers=4) as executor:
# Run the runnables
print("Parallel module running " +\
print("Running " +\
', '.join(["<" + runnable.name + ">" for runnable in self.runnables]))
futures = {executor.submit(runnable.run, data): runnable for runnable in self.runnables}

# Wait for completion
# TODO: add validation
wait(futures)

# Run the module functionality
return super().run(data)

def verify(self, channels: list[Channels]) -> bool:
"""
Expand Down Expand Up @@ -85,6 +82,10 @@ def prepare(self, data: Data) -> None:
super().prepare(data)
data.modules[self.type.value]["runnables"] = {}

# Override super initialization
data.persistable[self.type.value] = {}
data.persistable[self.type.value]["runnables"] = {}

# Initialize runnables' data
for runnable in self.runnables:
runnable.prepare(data)
Expand Down
9 changes: 3 additions & 6 deletions src/backend/pipeline/modules/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
class Preprocess(Module):
"""Perform data preprocessing on raw images."""

def __init__(self, data: Data, input_data: Any):
super().__init__(data, name="Preprocesisng", module_type=Modules.PREPROCESS)
def __init__(self, data: Data, input_data: Any, name="Preprocess"):
super().__init__(data, name=name, module_type=Modules.PREPROCESS)
self.masks: list[Mat] = input_data

def run(self, data: Data):
Expand All @@ -32,8 +32,6 @@ def run(self, data: Data):
for (x, mask) in zip(data.input, self.masks)]
data.modules[self.type.value]["masked"] = masked

return super().run(data)

class AgricultureVisionPreprocess(Preprocess):
"""
Perform data preprocessing on Agriculture-Vision: A Large Aerial Image Database for
Expand All @@ -44,7 +42,7 @@ class AgricultureVisionPreprocess(Preprocess):
"""

def __init__(self, data: Data, input_data: Any):
super().__init__(data, input_data=input_data)
super().__init__(data, input_data=input_data, name="AgricultureVisionPreprocess")

def run(self, data: Data):
"""
Expand Down Expand Up @@ -74,4 +72,3 @@ def run(self, data: Data):
data.modules[self.type.value]["clipping"] =\
[Mat(np.clip(x.get(), v_lower, v_upper).astype(np.uint8), data.input[0].channels)
for (x, (v_lower, v_upper)) in zip(data.modules[Modules.MOSAIC.value]["patches"], bounds)]
return super().run(data)
6 changes: 3 additions & 3 deletions src/backend/pipeline/modules/segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ class SemanticSegmentation(Module):
"""Perform semantic segmentation using the DeepLabV3+ model."""

def __init__(self, data: Data, input_data: Any) -> None:
super().__init__(data, name="Semantic Segmentation DeepLabv3+", module_type=Modules.SEGMENTATION)
super().__init__(data, name="Semantic Segmentation DeepLabv3+",
module_type=Modules.SEGMENTATION)
self.paths = input_data # paths to model atrifacts

def run(self, data: Data) -> Data:
def run(self, data: Data) -> None:
"""
Perform inference using the images given.
Each image should adhere to specific dimensions in order to be
Expand All @@ -30,5 +31,4 @@ def run(self, data: Data) -> Data:
predictions.append(model.predict(np.expand_dims((image.get()), axis=0)))

data.modules[self.type.value]["masks"] = predictions
return super().run(data)

27 changes: 21 additions & 6 deletions src/backend/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from firebase_admin import firestore
from google.cloud import storage
import time
import traceback

# temporary input bucket for manual triggers
temp_input_bucket = "terrafarm-inputs"
Expand Down Expand Up @@ -48,6 +49,12 @@ def __init__(self, config: Config):
tail.next = self.build_module(module, input_data)
tail = tail.next

# Show the build pipeline
print('------------------')
print('Modules in the pipeline:')
print(' -> '.join(module.__name__ for module in config.modules.keys()))
print('------------------')

def build_module(self, module: Type[Module], input_data: Any) -> Module:
"""
Builds a module and returns it.
Expand Down Expand Up @@ -114,23 +121,31 @@ async def run(self, imgs: Mat | list[Mat]) -> Data:
raise RuntimeError("Pipeline input integrity violated")

# Run the chain
iterator: Module | None = self.head
module: Module | None = self.head
failed: bool = False
while iterator is not None:
# Run the module
while module is not None:
try:
data = iterator.run(data)
# Prepare the module
print(f"Preparing <{module.name}>")
data.current = module.type
module.prepare(data)

# Run the module
print(f"Running <{module.name}>")
module.run(data)
except Exception as exception:
print("Running " + module.name + " failed: " + str(exception))
print(traceback.format_exc())
failed = True

# Upload data to the cloud asynchronously
if self.config.cloud.use_cloud:
asyncio.create_task(
asyncio.to_thread(
iterator.upload(data, self.collection, self.bucket, self.base_url)))
module.upload(data, self.collection, self.bucket, self.base_url)))

# Go to the next module
iterator = iterator.next
module = module.next

# Log end time of the pipeline
if self.config.cloud.use_cloud:
Expand Down
2 changes: 1 addition & 1 deletion src/backend/pipeline/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ def nutrient_pipeline() -> Pipeline:
AgricultureVisionPreprocess: None,
SemanticSegmentation: paths,
Index: {"config": None, "runnables": [Nutrient]}},
cloud=CloudConfig(True, "terrafarm-example"))
cloud=CloudConfig(False, "terrafarm-example"))
return Pipeline(cfg)
42 changes: 30 additions & 12 deletions src/backend/pipeline/test/unit/test_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,48 @@
from ...modules.modules import Modules
from ...config import Config, CloudConfig
from ...modules.index.indicies import Indicies
from ...modules.index.runnables.nutrient import Nutrient
from ...auth import get_credentials
from google.cloud import storage
import os
import glob
import numpy as np
import pytest
import asyncio
import tensorflow as tf

class TestNutrientRunnable:
"""
Unit testing for the nutrient deficiency module.
"""
"""Unit testing for the nutrient deficiency module."""

@pytest.mark.asyncio
@pytest.mark.skip(reason="Need the .npy file on Cloud Storage")
async def test_nutrient(self):
"""
Test the method run.
"""
paths = {3:"../../ml/deepv3_seg_3/", 4:"../../ml/deepv3_seg_4/"}
cfg = Config(modules={Mosaicing: None, AgricultureVisionPreprocess: None,
SemanticSegmentation: paths, Index: None},
# initialize config for the pipeline with the necessary modules
cfg = Config(modules={Mosaicing: None,
AgricultureVisionPreprocess: None,
SemanticSegmentation: paths,
Index: {"config": None, "runnables": [Nutrient]}},
cloud=CloudConfig())
pipeline = Pipeline(cfg)
imgs = [Mat.read(file) for file in glob.glob("../data/mosaicing/farm/D*.JPG")]
result = pipeline.run(imgs)
assert result.modules[Modules.INDEX]["runnables"][Indicies.NUTRIENT]["index"] is not None
assert result.modules[Modules.INDEX]["runnables"][Indicies.NUTRIENT]["masks"] is not None
expected = np.load("nutrient_masks.npy", allow_pickle=True)
out = result.modules[Modules.INDEX]["runnables"][Indicies.NUTRIENT]["masks"]
# get image paths
imgs = [Mat.read(file) for file in sorted(glob.glob("../data/mosaicing/farm/D*.JPG"))]
# run the pipeline
result = await pipeline.run(imgs)
assert result.modules[Modules.INDEX.value]["runnables"][Indicies.NUTRIENT.value]["index"] is not None
assert result.modules[Modules.INDEX.value]["runnables"][Indicies.NUTRIENT.value]["masks"] is not None

# if the necessary data is not already stored locally, download it from the cloud
if not os.path.exists("nutrient_masks.npy"):
# connect to Cloud Storage
storage_client = storage.Client(credentials=get_credentials())
bucket = storage_client.bucket("terrafarm-test")
blob = bucket.blob("nutrient_masks.npy")
blob.download_to_filename("nutrient_masks.npy")

# load data and assert
expected = np.load("nutrient_masks.npy", allow_pickle=True)
out = result.modules[Modules.INDEX.value]["runnables"][Indicies.NUTRIENT.value]["masks"]
assert np.array_equal(out, expected)
Loading