1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """C{agg [-r] [-g GROUPING_FUNCTION] INITIAL_VALUE AGGREGATION_FUNCTION}
19
20 C{agg [-r] [-c GROUPING_FUNCTION] INITIAL_VALUE AGGREGATION_FUNCTION}
21
22 Aggregates objects from the input stream. If C{GROUPING_FUNCTION} is omitted, then
23 one output object is generated by initializing an accumulator to C{INITIAL_VALUE}
24 and then combining the accumulator with input objects using C{AGGREGATION_FUNCTION}.
25 C{AGGREGATION_FUNCTION} takes two inputs, the current value of the accumulator and
26 an object from the input stream.
27
28 Example: If the input objects are integers C{1, 2, 3}, then the sum of the integers
29 is computed as follows::
30
31 ... ^ agg 0 'sum, x: sum + x'
32
33 which yields C{6}.
34
35 If C{GROUPING_FUNCTION} is specified, then a set of accumulators is maintained,
36 one for each value of C{GROUPING_FUNCTION}. Each output object is a tuple with
37 two parts, the group value and the accumulated value for the group.
38
39 Example: If the input objects are C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then
40 the sum of ints for each string is computed as follows::
41
42 ... ^ agg -g 'x, y: x' 0 'sum, x, y: sum + y'
43
44 which yields C{('a', 3), ('b', 7)}.
45
46 If the grouping function is specified with the C{-g} flag, then agg generates its
47 output when the input stream has ended. (It has to, because group members map
48 appear in any order.) In some situations however, group members appear consecutively,
49 and it is useful to get output earlier. If group members are known to be consecutive,
50 then the group function can be specified using the C{-c} flag.
51
52 If the C{-r} flag is specified, then one output object is generated for each input object;
53 the output object contains the value of the accumulator so far. The accumulator appears
54 in the output row before the inputs. For example, if the input stream contains C{1, 2, 3},
55 then the running total can be computed as follows::
56
57 ... ^ agg -r 0 'sum, x: sum + x' ^ ...
58
59 The output stream would be C{(1, 1), (3, 2), (6, 3)C}. In the last output object, C{6} is the sum
60 of the current input (C{3}) and all preceding inputs (C{1, 2}).
61
62 The C{-r} flag can also be used with grouping. For example, if the input objects are
63 C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then the running totals for the strings would
64 be computed as follows::
65
66 ... ^ agg -r -g 'x, y: x' 0 'sum, x, y: sum + y' ^ ...
67
68 The output stream would be C{(1, 'a', 1), (3, 'a', 2), (3, 'b', 3), (7, 'b', 4)}.
69
70 """
71
72 import osh.core
73 import osh.args
74
75 _wrap_if_necessary = osh.core.wrap_if_necessary
76 Option = osh.args.Option
77
78
81
82
83 -def agg(initial_value, aggregator, group = None, consecutive = None, running = False):
84 """Combine inputs into a smaller number of outputs. If neither C{group} nor
85 C{consecutive} is specified, then there is one accumulator, initialized to
86 C{initial_value}. The C{aggregator} function is used to combine the current value
87 of the accumulator with the input to yield the next value of the accumulator.
88 The arguments to C{aggregator} are the elements of the accumulator followed
89 by the elements of one piece of input.
90 If C{group} is specified, then there is one accumulator for each group value, defined
91 by applying the function C{group} to each input. C{consecutive} is just like C{group}
92 except that it is assumed that group values are adjacent in the input sequence.
93 At most one of C{group} and C{consecutive} may be specified. If C{running} is C{false},
94 then output contains one object per group, containing the aggregate value.
95 (If neither C{group} nor C{consecutive} are provided, then there is just one group,
96 representing the aggregate for the entire input stream.) If C{running} is true,
97 then each the aggregate value for the group is written out with each input object --
98 i.e., the output contains "running totals". In this case, the aggregate values appear
99 before the input values in the output object.
100 """
101 args = [initial_value, aggregator]
102 if group:
103 args.append(Option('-g', group))
104 if consecutive:
105 args.append(Option('-c', consecutive))
106 if running:
107 args.append(Option('-r'))
108 return _Agg().process_args(*args)
109
110 -class _Agg(osh.core.Op):
111
112 _aggregate = None
113
114
115
116
119
120
121
122
125
127 args = self.args()
128 grouping_function = args.function_arg('-g')
129 consecutive_grouping_function = args.function_arg('-c')
130 running_totals = args.flag('-r')
131 if running_totals is None:
132 running_totals = False
133 initial_value = _wrap_if_necessary(args.next_eval())
134 aggregation_function = args.next_function()
135 if grouping_function and consecutive_grouping_function:
136 self.usage()
137 if initial_value is None or aggregation_function is None:
138 self.usage()
139 if args.has_next():
140 self.usage()
141 if grouping_function and consecutive_grouping_function:
142 self.usage()
143 elif grouping_function:
144 self._aggregate = _GroupingAggregate(
145 self,
146 running_totals,
147 grouping_function,
148 initial_value,
149 aggregation_function)
150 elif consecutive_grouping_function:
151 self._aggregate = _ConsecutiveGroupingAggregate(
152 self,
153 running_totals,
154 consecutive_grouping_function,
155 initial_value,
156 aggregation_function)
157 else:
158 self._aggregate = _SimpleAggregate(
159 self,
160 running_totals,
161 initial_value,
162 aggregation_function)
163
164
166 self._aggregate.receive(object)
167
169 self._aggregate.receive_complete()
170
171
173 _running_totals = None
174 _command = None
175 _group_function = None
176 _initial_value = None
177 _aggregate_function = None
178 _sum = None
179
180 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
181 self._running_totals = running_totals
182 self._command = command
183 self._group_function = group_function
184 self._initial_value = initial_value
185 self._aggregate_function = aggregate_function
186 self._sum = {}
187
189 group = self._group_function(*object)
190 sum = self._sum.get(group, self._initial_value)
191 tuple_object = tuple(object)
192 new_sum = self._aggregate_function(*(tuple(sum) + tuple_object))
193 self._sum[group] = _wrap_if_necessary(new_sum)
194 if self._running_totals:
195 self._command.send((new_sum,) + tuple_object)
196
198 if not self._running_totals:
199 for group, sum in self._sum.iteritems():
200 self._command.send(_wrap_if_necessary(group) + tuple(sum))
201 self._command.send_complete()
202
204 _running_totals = None
205 _command = None
206 _group_function = None
207 _initial_value = None
208 _aggregate_function = None
209 _group = None
210 _sum = None
211
212 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
213 self._running_totals = running_totals
214 self._command = command
215 self._group_function = group_function
216 self._initial_value = initial_value
217 self._aggregate_function = aggregate_function
218 self._group = None
219 self._sum = None
220
222 new_group = self._group_function(*object)
223 if self._group is None or self._group != new_group:
224 if self._group is not None and not self._running_totals:
225 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum))
226 self._group = new_group
227 self._sum = self._initial_value
228 tuple_object = tuple(object)
229 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object))
230 self._sum = _wrap_if_necessary(new_sum)
231 if self._running_totals:
232 self._command.send((new_sum,) + tuple_object)
233
235 if (not self._running_totals) and self._group is not None:
236 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum))
237 self._command.send_complete()
238
240 _command = None
241 _initial_value = None
242 _aggregate_function = None
243 _sum = None
244
245 - def __init__(self, command, running_totals, initial_value, aggregate_function):
246 self._running_totals = running_totals
247 self._command = command
248 self._initial_value = initial_value
249 self._aggregate_function = aggregate_function
250 self._sum = initial_value
251
253 tuple_object = tuple(object)
254 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object))
255 self._sum = _wrap_if_necessary(new_sum)
256 if self._running_totals:
257 self._command.send((new_sum,) + tuple_object)
258
260 if not self._running_totals:
261 self._command.send(self._sum)
262 self._command.send_complete()
263