Coverage for src/ufig/plugins/resample.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1# Copyright (c) 2013 ETH Zurich, Institute of Astronomy, Lukas Gamper 

2# <lukas.gamper@usystems.ch> 

3""" 

4Created on Oct 7, 2013 

5@author: Lukas Gamper 

6 

7""" 

8 

9import numba as nb 

10import numpy as np 

11from ivy.plugin.base_plugin import BasePlugin 

12from scipy.integrate import dblquad 

13 

14 

15@nb.jit(nopython=True) 

16def sum_prod(a, b): 

17 c = 0.0 

18 for i in range(a.shape[0]): 

19 for j in range(a.shape[1]): 

20 c += a[i, j] * b[i, j] 

21 return c 

22 

23 

24@nb.jit(nopython=True) 

25def _convolve(a, k, as0, as1, ks0, ks1, cache, border, vals): 

26 # top 

27 # border[ks0//2:, ks1//2: -(ks1//2+1)] = a[:ks0,:] 

28 for i0 in range(ks0): 

29 for j0 in range(as1): 

30 border[i0 + ks0 // 2, j0 + ks1 // 2] = a[i0, j0] 

31 

32 for i1 in range(ks0 // 2, ks0): 

33 for j in range(ks1 // 2, as1 + ks1 // 2): 

34 vals[:] = border[ 

35 i1 - ks0 // 2 : i1 + ks0 // 2 + 1, j - ks1 // 2 : j + ks1 // 2 + 1 

36 ] 

37 # cache[i1-ks0//2, j-ks1//2] = np.sum(vals * k) 

38 cache[i1 - ks0 // 2, j - ks1 // 2] = sum_prod(vals, k) 

39 

40 for i in range(ks0 // 2, as0 - ks0 // 2): 

41 # left 

42 for j2 in range(0, ks1 // 2 + 1): 

43 vals[:] = 0 

44 vals[:, ks1 // 2 - j2 :] = a[ 

45 i - ks0 // 2 : i + ks0 // 2 + 1, : ks1 // 2 + j2 + 1 

46 ] 

47 # cache[-1, j2] = np.sum(vals * k) 

48 cache[-1, j2] = sum_prod(vals, k) 

49 

50 # center 

51 for j3 in range(ks1 // 2 + 1, as1 - ks1 // 2): 

52 a[i - ks0 // 2, j3 - ks1 // 2 - 1] = cache[0, j3 - ks1 // 2 - 1] 

53 vals[:] = a[ 

54 i - ks0 // 2 : i + ks0 // 2 + 1, j3 - ks1 // 2 : j3 + ks1 // 2 + 1 

55 ] 

56 # cache[-1, j3] = np.sum(vals * k) 

57 cache[-1, j3] = sum_prod(vals, k) 

58 

59 # right 

60 for r, j4 in enumerate(range(as1 - ks1 // 2, as1)): 

61 a[i - ks0 // 2, j4 - ks1 // 2 - 1] = cache[0, j4 - ks1 // 2 - 1] 

62 vals[:] = 0 

63 vals[:, : ks1 - r - 1] = a[i - ks0 // 2 : i + ks0 // 2 + 1, j4 - ks1 // 2 :] 

64 # cache[-1, j4] = np.sum(vals * k) 

65 cache[-1, j4] = sum_prod(vals, k) 

66 r += 1 

67 

68 # right fill 

69 for j5 in range(as1, as1 + ks1 // 2 + 1): 

70 a[i - ks0 // 2, j5 - ks1 // 2 - 1] = cache[0, j5 - ks1 // 2 - 1] 

71 # 

72 cache[0 : ks0 // 2, :] = cache[1 : ks0 // 2 + 1, :] 

73 

74 # bottom 

75 border[:] = 0 

76 # border[:ks0-1, ks1//2: -(ks1//2+1)] = a[-(ks0-1):,:] 

77 for i2 in range(ks0 - 1): 

78 for j6 in range(as1): 

79 border[i2, j6 + ks1 // 2] = a[i2 + (as0 - ks0) + 1, j6] 

80 

81 r = ks0 // 2 

82 for i3 in range(as0 - ks0 // 2, as0): 

83 for j7 in range(ks1 // 2, as1 + ks1 // 2): 

84 a[i3 - ks0 // 2, j7 - ks1 // 2] = cache[0, j7 - ks1 // 2] 

85 vals[:] = border[ 

86 r - ks0 // 2 : r + ks0 // 2 + 1, j7 - ks1 // 2 : j7 + ks1 // 2 + 1 

87 ] 

88 # cache[-1, j7-ks1//2] = np.sum(vals * k) 

89 cache[-1, j7 - ks1 // 2] = sum_prod(vals, k) 

90 cache[0 : ks0 // 2, :] = cache[1 : ks0 // 2 + 1, :] 

91 r += 1 

92 

93 # bottom fill 

94 # a[-(ks0//2):, :] = cache[:ks0//2, :] 

95 for i4 in range(ks0 // 2): 

96 for j8 in range(as1): 

97 a[i4 + as0 - ks0 // 2, j8] = cache[i4, j8] 

98 

99 

100def convolve(a, k): 

101 # TODO fix segault 

102 as0, as1 = a.shape 

103 ks0, ks1 = k.shape 

104 cache = np.zeros((ks0 // 2 + 1, as1)) 

105 border = np.zeros((ks0 + ks0 // 2, as1 + ks1)) 

106 vals = np.zeros_like(k) 

107 

108 _convolve(a, k, as0, as1, ks0, ks1, cache, border, vals) 

109 

110 return a 

111 

112 

113def get_lanczos_kernel(n): 

114 lanczos = np.zeros((2 * n + 1, 2 * n + 1), dtype=np.float64) 

115 for i in range(-n, n + 1, 1): 

116 for j in range(-n, n + 1, 1): 

117 lanczos[i + n, j + n] = dblquad( 

118 lambda x, y: np.sinc(y) * np.sinc(y / n) * np.sinc(x) * np.sinc(x / n), 

119 i - 0.5, 

120 i + 0.5, 

121 lambda x, j=j: j - 0.5, 

122 lambda x, j=j: j + 0.5, 

123 )[0] 

124 lanczos = lanczos / np.sum(lanczos) 

125 return lanczos 

126 

127 

128def get_lanczos_kernel_integral(par): 

129 lanczos = get_lanczos_kernel(par.lanczos_n) 

130 lanczos = lanczos.astype(par.image_precision) 

131 return lanczos 

132 

133 

134def read_resampling_kernel(par): 

135 from pkg_resources import resource_filename 

136 

137 import ufig 

138 

139 filepath_kernel = resource_filename( 

140 ufig.__name__, "res/resampling/" + par.filename_resampling_kernel 

141 ) 

142 kernel = np.loadtxt(filepath_kernel) 

143 kernel = kernel / np.sum(kernel) 

144 kernel = kernel.astype(par.image_precision) 

145 return kernel 

146 

147 

148KERNEL_GENERATOR = { 

149 "lanczos_integral": get_lanczos_kernel_integral, 

150 "read_from_file": read_resampling_kernel, 

151} 

152 

153 

154class Plugin(BasePlugin): 

155 """ 

156 Convolve the image with a Lanczos kernel to model the effect of correlated noise. 

157 The main effect of correlated noise modeled by this kernel is that coming from the 

158 coadding process. 

159 

160 :param lanczos_n: the order of lanczos function used to model the resampling 

161 

162 :return: image after the lanczos convolution 

163 

164 """ 

165 

166 def __call__(self): 

167 kernel_gen = KERNEL_GENERATOR[self.ctx.parameters.lanczos_kernel_type] 

168 kernel_img = kernel_gen(self.ctx.parameters) 

169 convolve(self.ctx.image, kernel_img) 

170 

171 def __str__(self): 

172 return "resample image"