Source code for spymicmac.resample

"""
spymicmac.resample is a collection of tools for resampling images
"""
import os
from pathlib import Path
import multiprocessing as mp
import PIL.Image
from osgeo import gdal
from skimage import io
from skimage.transform import AffineTransform, SimilarityTransform, warp
from skimage.morphology import disk
from scipy import ndimage
import numpy as np
from . import image, micmac, matching
from numpy.typing import NDArray
from typing import Union


[docs] def downsample(img: NDArray, fact: Union[int, float] = 4) -> NDArray: """ Rescale an image using Lanczos resampling :param img: the image to rescale :param fact: the number by which to divide the image width and height :return: **rescaled** -- the rescaled image """ _img = PIL.Image.fromarray(img) return np.array(_img.resize((np.array(_img.size) / fact).astype(int), PIL.Image.Resampling.LANCZOS))
[docs] def resample_hex(fn_img: Union[str, Path], scale: int, ori: str = 'InterneScan', alg=gdal.GRA_Bilinear, tps: bool = True, order: Union[int, None] = None) -> None: """ Resample a KH-9 Mapping Camera image based on the reseau grid, using gdal.Warp :param fn_img: the filename of the image to resample :param scale: the number of pixels per mm of the scanned image :param ori: the Ori directory that contains both MeasuresCamera.xml and MeasuresIm :param alg: the gdal resampling algorithm to use (default: gdal.GRA_Bilinear) :param tps: use a thin plate spline transformer to transform based on reseau grid :param order: the order (1-3) of polynomial GCP interpolation (default: not used) """ cam_meas = micmac.parse_im_meas(Path(f"Ori-{ori}", 'MeasuresCamera.xml')) img_meas = micmac.parse_im_meas(Path(f"Ori-{ori}", f"MeasuresIm-{fn_img}.xml")) all_meas = img_meas.set_index('name').join(cam_meas.set_index('name'), lsuffix='_img', rsuffix='_cam') all_meas['i_cam'] *= scale all_meas['j_cam'] *= scale gcp_list = [gdal.GCP(row.j_cam, row.i_cam, 1, row.j_img, row.i_img) for _, row in all_meas.iterrows()] ds = gdal.Open(fn_img) ds.SetGCPs(gcp_list, '') ds.FlushCache() ds = None options = {'xRes': 1, 'yRes': 1, 'outputBounds': [0, 0, all_meas.j_cam.max(), all_meas.i_cam.max()], 'resampleAlg': alg, 'tps': tps} if order is not None: options['polynomialOrder'] = order out_ds = gdal.Warp(f"tmp_{fn_img}", fn_img, **options) meta_shp = '{"shape": ' + f"[{out_ds.RasterYSize}, {out_ds.RasterXSize}]" + '}' out_ds.SetMetadata({'TIFFTAG_IMAGEDESCRIPTION': meta_shp}) out_ds.FlushCache() out_ds = None img = io.imread(f"tmp_{fn_img}") io.imsave(f"OIS-Reech_{fn_img}", np.flipud(img).astype(np.uint8)) os.remove(f"tmp_{fn_img}") os.remove(f"{fn_img}.aux.xml")
[docs] def rotate_from_rails(img: NDArray, rails: NDArray) -> tuple[NDArray, float]: """ Use the rail marks or other horizontal points in an image to rotate the image. :param img: the image to rotate. :param rails: an Nx2 array of (row, col) points :return: - **rotated** -- the rotated image - **angle** -- the calculated angle of rotation, in degrees """ slope, intercept = np.polyfit(rails[:, 1], rails[:, 0], 1) angle = np.rad2deg(np.arctan(slope)) print(f"Calculated angle of rotation: {angle:.4f}") return ndimage.rotate(img, angle), angle
[docs] def crop_panoramic(fn_img: Union[str, Path], flavor: str, marker_size: int = 31, fact: Union[int, None] = None, return_vals: bool = False) -> Union[None, tuple[tuple, float]]: """ Crop a declassified panoramic (KH4 or KH9) image, after rotating based on horizontal rail markers or "wagon wheel" fiducial markers. :param fn_img: the filename of the image to rotate and crop :param flavor: the camera type (KH4 or KH9) :param marker_size: The approximate size of the wagon wheels to identify in the image (default: 31 pixels) :param fact: the number by which to divide the image width and height to scale the image (default: do not scale) :param return_vals: Return estimated image border and rotation angle (default: False) :returns: - **border** -- the estimated image border (left, right, top, bot) - **angle** -- the estimated rotation angle. Only returned if **return_vals** is True. """ assert flavor in ['KH4', 'KH9'], "flavor must be one of [KH4, KH9]" img = io.imread(fn_img) if flavor == 'KH4': rails = matching.find_rail_marks(img, marker=disk(marker_size)) rotated, angle = rotate_from_rails(img, rails) # crop the lower part of the image to avoid introducing a bright line at the bottom rotated = rotated[:-int(0.1 * rails[:, 0].mean()), :] else: rails = matching.ocm_show_wagon_wheels(img, size=marker_size) # restrict ourselves to the top rail rails = rails[rails[:, 0] < 0.1 * img.shape[0]] # refine the choice to ensure the points are on the same line valid = matching._refine_rail(rails) rotated, angle = rotate_from_rails(img, rails[valid]) # get a rough idea of where the image frame should be left, right, top, bot = image.get_rough_frame(rotated) # buffer by 0.05% (left/right) or 0.5% (top/bottom) to ensure we have a clean image left += (right - left) * 0.0005 right -= (right - left) * 0.0005 top += (bot - top) * 0.005 bot -= (bot - top) * 0.005 # crop the image to the buffered window print(f'Cropping to window (left, right, top, bot): {int(left)}, {int(right)}, {int(top)}, {int(bot)}') cropped = rotated[int(top):int(bot), int(left):int(right)] if fact is not None: cropped = downsample(cropped, fact=fact) # if the image is from the aft camera, flip it before saving # USGS naming convention is *[A|F]00N, so to figure out if it's an aft image, # remove the extension, check the character 4 places from the end is_aft = os.path.splitext(fn_img)[0][-4] == 'A' if is_aft: cropped = np.fliplr(np.flipud(cropped)) # save the resampled image io.imsave('OIS-Reech_' + fn_img, cropped.astype(np.uint8)) if return_vals: return (left, right, top, bot), angle else: return None
[docs] def crop_from_extent(fn_img: Union[str, Path], border: NDArray, angle: Union[float, None] = None, fact: Union[int, None] = None) -> None: """ Crop an image given the coordinates of the image border. :param fn_img: the filename of the image to rotate and crop :param border: the estimated image border coordinates (left, right, top, bot) :param angle: the angle by which to rotate the image (default: None) :param fact: the number by which to divide the image width and height to scale the image (default: do not scale) """ img = io.imread(fn_img) if angle is not None: img = ndimage.rotate(img, angle) left, right, top, bot = border print(f'Cropping to window (left, right, top, bot): {int(left)}, {int(right)}, {int(top)}, {int(bot)}') cropped = img[int(top):int(bot), int(left):int(right)] if fact is not None: cropped = downsample(cropped, fact=fact) # if the image is from the aft camera, flip it before saving # USGS naming convention is *[A|F]00N, so to figure out if it's an aft image, # remove the extension, check the character 4 places from the end is_aft = os.path.splitext(fn_img)[0][-4] == 'A' if is_aft: cropped = np.fliplr(np.flipud(cropped)) io.imsave('OIS-Reech_' + fn_img, cropped.astype(np.uint8))
def _border_mask(img, border): mask = 255 * np.ones(img.shape, dtype=np.uint8) mask[border:-border, border:-border] = 0 return mask def align_image_borders(fn_left, fn_right, border): left = io.imread(fn_left) right = io.imread(fn_right) mask_left = _border_mask(left, border) mask_right = _border_mask(right, border)
[docs] def resample_fiducials(fn_img: Union[str, Path, list[str], list[Path]], scale: float, transform=AffineTransform(), fn_cam: Union[str, Path, None] = None, nproc: int = 1) -> None: """ Resample image(s) using fiducial markers. :param fn_img: the filename, or a list of filenames, of the image(s) :param scale: the image scale (in mm/pixel) to use :param transform: the type of transformation to use. Should be an instance of skimage.transform (default: AffineTransform) :param fn_cam: the filename for the MeasuresCamera.xml file (default: Ori-InterneScan/MeasuresCamera.xml) :param nproc: the number of processors to use (default: 1) """ if type(fn_img) is str: _fiducials(fn_img=fn_img, scale=scale, fn_cam=fn_cam, transform=transform) else: if nproc > 1: pool = mp.Pool(nproc, maxtasksperchild=1) arg_dict = {'scale': scale, 'fn_cam': fn_cam, 'transform': transform} pool_args = [{'fn_img': fn} for fn in fn_img] for d in pool_args: d.update(arg_dict) pool.map(_fiducials_wrapper, pool_args, chunksize=1) pool.close() pool.join() else: for fn in fn_img: _fiducials(fn_img=fn, scale=scale, fn_cam=fn_cam, transform=transform)
def _fiducials_wrapper(args): _fiducials(**args) def _fiducials(fn_img=None, scale=None, fn_cam=None, transform=AffineTransform): print(fn_img) meas = micmac.parse_im_meas(Path('Ori-InterneScan', f'MeasuresIm-{fn_img}.xml')) if fn_cam is None: measures_cam = micmac.parse_im_meas(Path('Ori-InterneScan', 'MeasuresCamera.xml')) else: measures_cam = micmac.parse_im_meas(fn_cam) measures_cam['i'] /= scale measures_cam['j'] /= scale joined = meas.set_index('name').join(measures_cam.set_index('name'), lsuffix='_img', rsuffix='_cam') transform.estimate(joined[['j_img', 'i_img']].values, joined[['j_cam', 'i_cam']].values) outshape = (int(joined['i_cam'].max() - joined['i_cam'].min()), int(joined['j_cam'].max() - joined['j_cam'].min())) img = io.imread(fn_img) img_tfm = warp(img, transform.inverse, output_shape=outshape, preserve_range=True, order=5) io.imsave('OIS-Reech_' + fn_img, img_tfm.astype(np.uint8))