Source code for microscopy_data_analysis.data_formats_io

"""
submodule focussed on dataformats
"""
import ast
import datetime
import io
import os
from contextlib import redirect_stdout

import cv2
import h5py
import ncempy.io as nio
import numpy as np
import tifffile

from .image_processing import img_normalize, img_to_uint8


[docs] def h5_to_pyramidal_tiff( h5_path, dset_name, out_path, compression="lzw", max_levels=4, dtype=None ): """ Convert a chunked HDF5 dataset to a pyramidal BigTIFF using tifffile. Memory-safe: only one tile is loaded at a time. """ with h5py.File(h5_path, "r") as f: dset = f[dset_name] shape = dset.shape if dtype is None: dtype = dset.dtype if dset.chunks is None: raise ValueError("Dataset must be chunked") tile_h, tile_w = dset.chunks[:2] def iterate_tiles(dataset, tile_size=(tile_h, tile_w)): H, W = dataset.shape[:2] for y0 in range(0, H, tile_size[0]): y1 = min(y0 + tile_size[0], H) for x0 in range(0, W, tile_size[1]): x1 = min(x0 + tile_size[1], W) yield dataset[y0:y1, x0:x1] # --- Step 1: Write full resolution (level 0) --- with tifffile.TiffWriter(out_path, bigtiff=True) as tif: tif.write( data=iterate_tiles(dset), shape=shape, dtype=dtype, tile=(tile_h, tile_w), photometric="rgb" if len(shape) == 3 else "minisblack", compression=compression, subifds=max_levels-1 # reserve pyramid levels ) prev_shape = shape prev_tile_size = (tile_h, tile_w) prev_dataset = dset # --- Step 2: create lower pyramid levels --- for _ in range(1, max_levels): # _ = level new_shape = (max(prev_shape[0] // 2, 1), max(prev_shape[1] // 2, 1)) if len(shape) == 3: new_shape += (shape[2],) new_tile_size = (max(prev_tile_size[0] // 2, 1), max(prev_tile_size[1] // 2, 1)) def downsample_tiles(prev_tile_size=prev_tile_size): for tile in iterate_tiles(prev_dataset, tile_size=prev_tile_size): h, w = tile.shape[:2] h2, w2 = max(h // 2, 1), max(w // 2, 1) if len(tile.shape) == 3: tile_ds = tile[:h2*2, :w2*2].reshape(h2, 2, w2, 2, tile.shape[2]).mean(axis=(1,3)).astype(dtype) else: tile_ds = tile[:h2*2, :w2*2].reshape(h2, 2, w2, 2).mean(axis=(1,3)).astype(dtype) yield tile_ds tif.write( data=downsample_tiles(), shape=new_shape, dtype=dtype, tile=new_tile_size, photometric="rgb" if len(shape) == 3 else "minisblack", compression=compression ) # update for next level prev_shape = new_shape prev_tile_size = new_tile_size #prev_dataset = None # free memory; tiles come from generator print(f"Saved pyramidal TIFF: {out_path}")
[docs] def h5_to_tiff( h5_path, dset_name, out_path, compression="lzw", big_tiff=True, dtype=None ): """Convert a chunked HDF5 dataset to a BigTIFF (no pyramid).""" # --- Open HDF5 --- with h5py.File(h5_path, "r") as f: dset = f[dset_name] shape = dset.shape if dtype is None: dtype = dset.dtype if dset.chunks is None: raise ValueError("Dataset must be chunked") tile_h, tile_w = dset.chunks[:2] # Generator to stream tiles def tile_generator(): H, W = shape[:2] for y0 in range(0, H, tile_h): y1 = min(y0 + tile_h, H) for x0 in range(0, W, tile_w): x1 = min(x0 + tile_w, W) yield dset[y0:y1, x0:x1].astype(dtype) # --- Write BigTIFF --- tifffile.imwrite( out_path, data=tile_generator(), shape=shape, dtype=dtype, bigtiff=big_tiff, tile=(tile_h, tile_w), photometric="rgb" if len(shape) == 3 else "minisblack", compression=compression ) print(f"Saved TIFF: {out_path}")
[docs] def imsave(filename,img,floatnormalize=True): """ writes single (grayscale), triple (RGB), quadrupel (RGBA) channel images, for uint8 and uint16 formats ".png" is used as default, for float32 ".tif" is used as default double channel images (gray,alpha) will be transformed to RGBA filename extensions (".png", ".jpg", ...) are optional and can be given to obtain a format differing from default Args: filename (string): filename with or without extension determining the format. img (array_like): image matrix, either MxN, MxNx1, MxNx3 or MxNx4. floatnormalize (bool): transform the color range between 0 and 1 (for better compatibility with most image viewers) Defaults to True. Returns: success_info (bool): True, when succesfully written. """ invfilename=filename[::-1] # filename should not include other "." not related to file format dotpos=invfilename.find('.') if dotpos==-1: file_extension=".png" else: file_extension=invfilename[:dotpos+1][::-1] filename=filename[:-dotpos-1] if np.issubdtype(img.dtype,np.floating): file_extension=".tif" if floatnormalize: img=img_normalize(img) img=img.astype(np.float32) fn=filename+file_extension if len(img.shape)==3: if img.shape[2]==2: #converting double channel to RGBA img=np.dstack((img[:,:,0],img[:,:,0],img[:,:,0],img[:,:,1])) else: # changing from rgb to bgr, as cv2 saves as bgr, but rgb is wanted here img[:,:,[0,2]]=img[:,:,[2,0]] print(img.shape) return cv2.imwrite(fn,img)
[docs] def imsave_multi(filename,stack,floatnormalize=True): """ save multiple (single channel / greyscale) images as tiff-stack (supported formats uint8, uint16, float32) Args: filename (string): filename with or without .tiff extension. stack (list of images): grayscale images only containing one channel. floatnormalize (bool): transform the color range between 0 and 1 (for better compatibility with most image viewers) Defaults to True. Returns: success_info (bool): True, when succesfully written. """ invfilename=filename[::-1] dotpos=invfilename.find('.') if dotpos==-1: file_extension=".tiff" else: file_extension=invfilename[:dotpos+1][::-1] filename=filename[:-dotpos-1] if np.issubdtype(stack[0].dtype,np.floating): file_extension=".tiff" for i in range(len(stack)): if floatnormalize: stack[i]=img_normalize(stack[i]).astype(np.float32) else: stack[i]=stack[i].astype(np.float32) fn=filename+file_extension return cv2.imwrite_multi(fn,stack)
[docs] def get_emd_with_metadata(filepath): """ Read TEM-imgages as .emd-files Args: filepath (string): relative or absolute path to the file. Returns: image (MxN array_like): 2D image with only one intensity-channel (gray-scale). metadata (dict): Dictionary containing the most important metadata. """ image_with_metadata=h5py.File(filepath) metadata=dict() kw=list(image_with_metadata["Data/Image"].keys())[0] image=image_with_metadata["Data/Image/"+kw+"/Data"][:] ascii_char="" for num in image_with_metadata["Data/Image/"+kw+"/Metadata"][:,0]: ascii_char += chr(num) allmetadata=ast.literal_eval(ascii_char[:ascii_char.find("\n")]) #print(allmetadata) unixtimestamp=allmetadata["Acquisition"]["AcquisitionStartDatetime"]["DateTime"] metadata["unix_timestamp"]=int(unixtimestamp) uts=datetime.datetime.fromtimestamp(int(unixtimestamp)) metadata["datetime"]=uts.strftime('%Y.%m.%d, %H:%M:%S') metadata["beam_mode"]=allmetadata["Optics"]["IlluminationMode"] metadata["detector_name"]=allmetadata["BinaryResult"]["Detector"] pix_to_nm=np.double(allmetadata["BinaryResult"]["PixelSize"]["width"])*10**9 metadata["pixelsize_nm"]=pix_to_nm metadata["camera_length_mm"]=np.double(allmetadata["Optics"]["CameraLength"])*1000 metadata["frame_time_sec"]=np.double(allmetadata["Scan"]["FrameTime"]) if metadata["detector_name"]=="HAADF": metadata["dwell_time_microsec"]=np.double(allmetadata["Scan"]["DwellTime"])*10**6 else: metadata["exposure_time_sec"]=np.double(allmetadata["Detectors"]["Detector-0"]["ExposureTime"]) metadata["x_mm"]=np.double(allmetadata["Stage"]["Position"]["x"])*1000 metadata["y_mm"]=np.double(allmetadata["Stage"]["Position"]["y"])*1000 metadata["z_mm"]=np.double(allmetadata["Stage"]["Position"]["z"])*1000 # it seems weird, that scan rotation is only saved with this detector if metadata["detector_name"]=="HAADF": metadata["scan_rotation_rad"]=np.double(allmetadata["Scan"]["ScanRotation"]) #assuming quadratic camera chip size=pix_to_nm*image.shape[0] metadata["field_of_view_microns"]=size/1000 metadata["image_shape"]=image.shape[:2] metadata["acceleration_volt"]=np.double(allmetadata["Optics"]["AccelerationVoltage"]) if metadata["beam_mode"] != "Parallel": metadata["beam_convergence_mrad"]=np.double(allmetadata["Optics"]["BeamConvergence"])*1000 metadata["collection_angle_start_mrad"]=np.double( allmetadata["Detectors"]["Detector-1"]["CollectionAngleRange"]["begin"])*1000 metadata["collection_angle_end_mrad"]=np.double( allmetadata["Detectors"]["Detector-1"]["CollectionAngleRange"]["end"])*1000 image=image[:,:,0] return image,metadata
#%% get_dm4_with_metadata
[docs] def get_dm4_with_metadata(filepath): """ read a dm4-file and return the variables data and metadata as dictionaries data includes everything immediately important metadata contains all other information Important missing parameters are electric current and only for spectra: acquisition time and acquisition date are missing Args: filepath (string): relative or absolute path to the file. Returns: data (dict): data["data"] yields the data other keywords contain the most important metadata. metadata (dict): Dictionary containing further metadata. """ relevant_metadata_list=["Grating","Objective focus (um)","Stage X","Stage Y", "Stage Z","Stage Beta","Stage Alpha","Indicated Magnification","Bandpass", "Detector","Filter","PMT HV","Sensitivity","Slit Width","Sample Time", "Dwell time (s)","Image Height","Image Width", "Number Summing Frames", "Voltage","Lightpath","Signal Name","Acquisition Time","Acquisition Date"] f = io.StringIO() with redirect_stdout(f): data=nio.dm.dmReader(filepath,verbose=True) all_metadata = f.getvalue() metadata=dict() for i in range(len(relevant_metadata_list)): start=all_metadata.find("curTagValue = "+relevant_metadata_list[i]) end=all_metadata[start:].find("\n") if start != -1: end+=start start+=len("curTagValue = "+relevant_metadata_list[i])+2 metadata[relevant_metadata_list[i]]=all_metadata[start:end] return data,metadata
[docs] def save_dm4_spectra_as_csv(spectrum_paths): for i in spectrum_paths: data,metadata=get_dm4_with_metadata(i) x=data["coords"][1] y=data["data"][0] xy=np.zeros([2,len(x)]) xy[0]=x xy[1]=y np.savetxt(i[:-3]+"csv",xy.T,delimiter=",",fmt="%i")
[docs] def convert_dm4_to_png(dm4_filepaths,logscale=False): #stringoffset=2 for i in dm4_filepaths: dmd,meta=get_dm4_with_metadata(i) new=i[:-4] if len(dmd["data"].shape)==3: if not os.path.exists(new): os.mkdir(new) halfbandpass=(dmd["coords"][0][1]-dmd["coords"][0][0])/2 wavelengths=np.round(dmd["coords"][0]+halfbandpass,1) if dmd["pixelUnit"][0] !='nm': print("other pixelsize than 'nm' occured") pixelSizeString=str(np.round(dmd["pixelSize"][1],4)*1000)[:6]+"nm" if dmd["pixelUnit"][1] !='µm': print("other pixelsize than 'µm' occured") if logscale: newdat=dmd["data"]-np.min(dmd["data"]) newdat= newdat +np.min(newdat[newdat>0]) * 0.01 for j in range(len(wavelengths)): newname=new+"/"+"img"+str(j)+"_at_"+str(wavelengths[j])+"nm_with_pixelSize_"+pixelSizeString if logscale: image=img_to_uint8(np.log(newdat[j])) newname=newname+"_logscale.png"#[:newname.find(".png")]+ cv2.imwrite(newname,image) else: image=img_to_uint8(dmd["data"][j]) newname=newname+".png" cv2.imwrite(newname,image) else: revnew=new[::-1] magnification_start=revnew.find("_X")+1 magnification_end=magnification_start+revnew[magnification_start:].find("_") pixelSizeString=str(np.round(dmd["pixelSize"][0],4)*1000)[:6]+"nm" if dmd["pixelUnit"][0] !='µm': print("other pixelsize than 'µm' occured") #new[-magnification_end:-magnification_start] newname=new[:-magnification_end]+"pixelSize" newname += pixelSizeString +new[-magnification_start:] if logscale: newdat=dmd["data"]-np.min(dmd["data"]) newdat= newdat +np.min(newdat[newdat>0]) * 0.01 image=img_to_uint8(np.log(newdat)) newname=newname+"_logscale.png"#[:newname.find(".png")]+ else: image=img_to_uint8(dmd["data"]) newname =newname+".png" cv2.imwrite(newname,image)
[docs] def get_SEMtif_with_metadata(filepath): with open(filepath, encoding="utf8",errors="ignore") as fp: contents=fp.read() text=contents[contents.find("Date"):] # qtu <-- quantitiy_type_unit #Beam qtu=[["\nSystemType=","str",None], ["\nTimeOfCreation=","str",None], ["\nPixelWidth=","np.float64","m"], ["\nPixelHeight=","np.float64","m"], #Microscope ["\nHV=","np.float64","V"], ["\nBeamCurrent=","np.float64","A"], ["\nWorkingDistance=","np.float64","m"], ["\nDwelltime=","np.float64","s"], ["\nSpot=","np.float64","nm"], ["\nStigmatorX=","np.float64",None], ["\nStigmatorY=","np.float64",None], ["\nBeamShiftX=","np.float64",None], ["\nBeamShiftY=","np.float64",None], ["\nSourceTiltX=","np.float64",None], ["\nSourceTiltY=","np.float64",None], ["\nEmissionCurrent=","np.float64","A"], ["\nSpecimenCurrent=","np.float64","A"], #["\nApertureDiameter=","np.float64","m"], #["\nATubeVoltage=","np.float64","V"], #Scan ["\nUseCase=","str",None], ["\nTiltCorrectionIsOn=","str",None],#yes,no ["\nScanRotation=","np.float64","rad"], #CompoundLens ["\nIsOn=","str",None], #On,Off ["\nThresholdEnergy=","np.float64","eV"], #Stage ["\nStageX=","np.float64","m"], ["\nStageY=","np.float64","m"], ["\nStageZ=","np.float64","m"], ["\nStageR=","np.float64","rad"], ["\nStageTa=","np.float64","rad"], ["\nStageTb=","np.float64","rad"], ["\nStageBias=","np.float64","V"], ["\nChPressure=","np.float64","Pa"], #Detecor ["\nName=","str",None], ["\nMode=","str",None], ["\nSignal=","str",None], ["\nContrast=","np.float64",None], ["\nBrightness=","np.float64",None], ["\nContrastDB=","np.float64","dB"], ["\nBrightnessDB=","np.float64","dB"], ["\nAverage=","int",None], ["\nIntegrate=","int",None], ["\nResolutionX=","int",None], ["\nResolutionY=","int",None], ["\nHorFieldsize=","np.float64","m"], ["\nVerFieldsize=","np.float64","m"], ["\nFrameTime=","np.float64","s"], #Digital ["\nDigitalContrast=","np.float64",None], ["\nDigitalBrightness=","np.float64",None], ["\nDigitalGamma=","np.float64",None]] res=[] typechanges=[] counter=0 for i in qtu: kw=i[0] start=text.find(kw)+len(kw) end=text[start:].find("\n") if end == 0 or start==-1 or kw not in text: res.append(None) else: if i[1]=="int": res.append(int(text[start:start+end])) elif i[1]=="np.float64": res.append(np.double(text[start:start+end])) elif i[1]=="str" or i[1]=="string": if text[start:start+end] in ["no","No","off","Off","false","False"]: res.append(False) typechanges.append(counter) elif text[start:start+end] in ["yes","Yes","on","On","true","True"]: res.append(True) typechanges.append(counter) elif ":" in text[start:start+end]: datestring=text[start:start+end] datestring=datestring.replace("."," ") day,month,year,hour=datestring.split(" ") newdate=year+"-"+month+"-"+day+" "+hour newdate +=".000Z" res.append(newdate) else: res.append(text[start:start+end]) counter +=1 for j in typechanges: qtu[j][1]="bool" res.append(os.path.abspath(filepath).replace("\\","/")) qtu.append(["PathToImage","str",None]) qtu_dict=readable_names(res,qtu) #additional detector information if qtu_dict["Detector"]["value"] == "ETD": kw="\nGrid=" start=text.find(kw)+len(kw) end=text[start:].find("\n") Grid_Voltage=np.double(text[start:start+end]) qtu_dict["Grid_Voltage"]=dict() qtu_dict["Grid_Voltage"]["dtype"]="np.float64" qtu_dict["Grid_Voltage"]["unit"]="V", qtu_dict["Grid_Voltage"]["value"]=Grid_Voltage return cv2.imread(filepath,-1),qtu_dict
[docs] def readable_names(res,qtu): nice_names=dict() nice_names["\nSystemType="]="Microscope" nice_names["\nTimeOfCreation="]="Time_of_Creation" nice_names["\nPixelWidth="]="Pixel_Width" nice_names["\nPixelHeight="]="Pixel_Height" nice_names["\nHV="]="Acceleration_Voltage" nice_names["\nBeamCurrent="]="Beam Current" nice_names["\nWorkingDistance="]="Working_Distance" nice_names["\nDwelltime="]="Dwell_Time" nice_names["\nSpot="]="Spot_Diameter_(estimated)" nice_names["\nStigmatorX="]="Stigmator_X" nice_names["\nStigmatorY="]="Stigmator_Y" nice_names["\nBeamShiftX="]="Beam_Shift_X" nice_names["\nBeamShiftY="]="Beam_Shift_Y" nice_names["\nSourceTiltX="]="Source_Tilt_X" nice_names["\nSourceTiltY="]="Source_Tilt_Y" nice_names["\nEmissionCurrent="]="Emission_Current" nice_names["\nSpecimenCurrent="]="Specimen_Current" nice_names["\nUseCase="]="SEM_Mode" nice_names["\nTiltCorrectionIsOn="]="Tilt_Correction" nice_names["\nScanRotation="]="Scan_Rotation" nice_names["\nIsOn="] ="Compound_Lens" nice_names["\nThresholdEnergy="]="Compound_Lens_Threshold_Energy" nice_names["\nStageX="]="Stage_X" nice_names["\nStageY="]="Stage_Y" nice_names["\nStageZ="]="Stage_Z" nice_names["\nStageR="]="Stage_Rotation" nice_names["\nStageTa="]="Stage_Tilt_alpha" nice_names["\nStageTb="]="Stage_Tilt_beta" nice_names["\nStageBias="]="Stage_Bias" nice_names["\nChPressure="]="Chamber_Pressure" nice_names["\nName="]="Detector" nice_names["\nMode="]="Detector_Mode" nice_names["\nSignal="]="Signal_Type" nice_names["\nContrast="]="Contrast" nice_names["\nContrastDB="]="Contrast_DB" nice_names["\nBrightness="]="Brightness" nice_names["\nBrightnessDB="]="Brightness_DB" nice_names["\nAverage="]="Average" nice_names["\nIntegrate="]="Integrate" nice_names["\nResolutionX="]="Resolution_X" nice_names["\nResolutionY="]="Resolution_Y" nice_names["\nHorFieldsize="]="Horizontal_Fieldsize" nice_names["\nVerFieldsize="]="Vertical_Fieldsize" nice_names["\nFrameTime="]="Frame_Time" nice_names["\nDigitalContrast="]="Digital_Contrast" nice_names["\nDigitalBrightness="]="Digital_Brightness" nice_names["\nDigitalGamma="]="Digital_Gamma" nice_names["PathToImage"]="Path_to_Image" qtu_dict=dict() for i in range(len(qtu)): qtu_dict[nice_names[qtu[i][0]]]=dict() qtu_dict[nice_names[qtu[i][0]]]["dtype"]=qtu[i][1] qtu_dict[nice_names[qtu[i][0]]]["unit"]=qtu[i][2] qtu_dict[nice_names[qtu[i][0]]]["value"]=res[i]#qtu[i][1] #qtu_dict[nice_names[qtu[i][0]]].append(res[i]) #qtu[i][0]=nice_names[qtu[i][0]] return qtu_dict