Line data Source code
1 : /*
2 : * Famedly Matrix SDK
3 : * Copyright (C) 2019, 2020, 2021 Famedly GmbH
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU Affero General Public License as
7 : * published by the Free Software Foundation, either version 3 of the
8 : * License, or (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU Affero General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU Affero General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : import 'dart:async';
20 : import 'dart:convert';
21 :
22 : import 'package:collection/collection.dart';
23 :
24 : import 'package:matrix/matrix.dart';
25 : import 'package:matrix/src/models/timeline_chunk.dart';
26 :
27 : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
28 : /// automatically. The initial
29 : /// event list will be retreived when created by the `room.getTimeline()` method.
30 :
31 : class Timeline {
32 : final Room room;
33 27 : List<Event> get events => chunk.events;
34 :
35 : /// Map of event ID to map of type to set of aggregated events
36 : final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
37 :
38 : final void Function()? onUpdate;
39 : final void Function(int index)? onChange;
40 : final void Function(int index)? onInsert;
41 : final void Function(int index)? onRemove;
42 : final void Function()? onNewEvent;
43 :
44 : StreamSubscription<Event>? timelineSub;
45 : StreamSubscription<Event>? historySub;
46 : StreamSubscription<SyncUpdate>? roomSub;
47 : StreamSubscription<String>? sessionIdReceivedSub;
48 : StreamSubscription<String>? cancelSendEventSub;
49 : bool isRequestingHistory = false;
50 : bool isRequestingFuture = false;
51 :
52 : bool allowNewEvent = true;
53 : bool isFragmentedTimeline = false;
54 :
55 : final Map<String, Event> _eventCache = {};
56 :
57 : TimelineChunk chunk;
58 :
59 : /// Searches for the event in this timeline. If not
60 : /// found, requests from the server. Requested events
61 : /// are cached.
62 2 : Future<Event?> getEventById(String id) async {
63 4 : for (final event in events) {
64 4 : if (event.eventId == id) return event;
65 : }
66 4 : if (_eventCache.containsKey(id)) return _eventCache[id];
67 4 : final requestedEvent = await room.getEventById(id);
68 : if (requestedEvent == null) return null;
69 4 : _eventCache[id] = requestedEvent;
70 4 : return _eventCache[id];
71 : }
72 :
73 : // When fetching history, we will collect them into the `_historyUpdates` set
74 : // first, and then only process all events at once, once we have the full history.
75 : // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
76 : // even if /sync's complete while history is being proccessed.
77 : bool _collectHistoryUpdates = false;
78 :
79 : // We confirmed, that there are no more events to load from the database.
80 : bool _fetchedAllDatabaseEvents = false;
81 :
82 1 : bool get canRequestHistory {
83 2 : if (events.isEmpty) return true;
84 0 : return !_fetchedAllDatabaseEvents ||
85 0 : (room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
86 : }
87 :
88 : /// Request more previous events from the server. [historyCount] defines how many events should
89 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
90 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
91 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
92 : /// true by default, but this can be overridden.
93 : /// This method does not return a value.
94 2 : Future<void> requestHistory({
95 : int historyCount = Room.defaultHistoryCount,
96 : StateFilter? filter,
97 : }) async {
98 2 : if (isRequestingHistory) {
99 : return;
100 : }
101 :
102 2 : isRequestingHistory = true;
103 2 : await _requestEvents(
104 : direction: Direction.b,
105 : historyCount: historyCount,
106 : filter: filter,
107 : );
108 2 : isRequestingHistory = false;
109 : }
110 :
111 0 : bool get canRequestFuture => !allowNewEvent;
112 :
113 : /// Request more future events from the server. [historyCount] defines how many events should
114 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
115 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
116 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
117 : /// true by default, but this can be overridden.
118 : /// This method does not return a value.
119 1 : Future<void> requestFuture({
120 : int historyCount = Room.defaultHistoryCount,
121 : StateFilter? filter,
122 : }) async {
123 1 : if (allowNewEvent) {
124 : return; // we shouldn't force to add new events if they will autatically be added
125 : }
126 :
127 1 : if (isRequestingFuture) return;
128 1 : isRequestingFuture = true;
129 1 : await _requestEvents(
130 : direction: Direction.f,
131 : historyCount: historyCount,
132 : filter: filter,
133 : );
134 1 : isRequestingFuture = false;
135 : }
136 :
137 3 : Future<void> _requestEvents({
138 : int historyCount = Room.defaultHistoryCount,
139 : required Direction direction,
140 : StateFilter? filter,
141 : }) async {
142 4 : onUpdate?.call();
143 :
144 : try {
145 : // Look up for events in the database first. With fragmented view, we should delete the database cache
146 3 : final eventsFromStore = isFragmentedTimeline
147 : ? null
148 8 : : await room.client.database?.getEventList(
149 2 : room,
150 4 : start: events.length,
151 : limit: historyCount,
152 : );
153 :
154 2 : if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
155 : // Fetch all users from database we have got here.
156 0 : for (final event in events) {
157 0 : if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
158 : continue;
159 : }
160 : final dbUser =
161 0 : await room.client.database?.getUser(event.senderId, room);
162 0 : if (dbUser != null) room.setState(dbUser);
163 : }
164 :
165 0 : if (direction == Direction.b) {
166 0 : events.addAll(eventsFromStore);
167 0 : final startIndex = events.length - eventsFromStore.length;
168 0 : final endIndex = events.length;
169 0 : for (var i = startIndex; i < endIndex; i++) {
170 0 : onInsert?.call(i);
171 : }
172 : } else {
173 0 : events.insertAll(0, eventsFromStore);
174 0 : final startIndex = eventsFromStore.length;
175 : final endIndex = 0;
176 0 : for (var i = startIndex; i > endIndex; i--) {
177 0 : onInsert?.call(i);
178 : }
179 : }
180 : } else {
181 3 : _fetchedAllDatabaseEvents = true;
182 6 : Logs().i('No more events found in the store. Request from server...');
183 :
184 3 : if (isFragmentedTimeline) {
185 1 : await getRoomEvents(
186 : historyCount: historyCount,
187 : direction: direction,
188 : filter: filter,
189 : );
190 : } else {
191 4 : if (room.prev_batch == null) {
192 0 : Logs().i('No more events to request from server...');
193 : } else {
194 4 : await room.requestHistory(
195 : historyCount: historyCount,
196 : direction: direction,
197 2 : onHistoryReceived: () {
198 2 : _collectHistoryUpdates = true;
199 : },
200 : filter: filter,
201 : );
202 : }
203 : }
204 : }
205 : } finally {
206 3 : _collectHistoryUpdates = false;
207 3 : isRequestingHistory = false;
208 4 : onUpdate?.call();
209 : }
210 : }
211 :
212 : /// Request more previous events from the server. [historyCount] defines how much events should
213 : /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
214 : /// the historical events will be published in the onEvent stream. [filter] allows you to specify a
215 : /// [StateFilter] object to filter the events, which can include various criteria such as
216 : /// event types (e.g., [EventTypes.Message]) and other state-related filters.
217 : /// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden.
218 : /// Returns the actual count of received timeline events.
219 1 : Future<int> getRoomEvents({
220 : int historyCount = Room.defaultHistoryCount,
221 : direction = Direction.b,
222 : StateFilter? filter,
223 : }) async {
224 : // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
225 1 : filter ??= StateFilter(lazyLoadMembers: true);
226 1 : filter.lazyLoadMembers ??= true;
227 :
228 3 : final resp = await room.client.getRoomEvents(
229 2 : room.id,
230 : direction,
231 3 : from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
232 : limit: historyCount,
233 2 : filter: jsonEncode(filter.toJson()),
234 : );
235 :
236 1 : if (resp.end == null) {
237 2 : Logs().w('We reached the end of the timeline');
238 : }
239 :
240 2 : final newNextBatch = direction == Direction.b ? resp.start : resp.end;
241 2 : final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
242 :
243 1 : final type = direction == Direction.b
244 : ? EventUpdateType.history
245 : : EventUpdateType.timeline;
246 :
247 3 : if ((resp.state?.length ?? 0) == 0 &&
248 3 : resp.start != resp.end &&
249 : newPrevBatch != null &&
250 : newNextBatch != null) {
251 1 : if (type == EventUpdateType.history) {
252 0 : Logs().w(
253 0 : '[nav] we can still request history prevBatch: $type $newPrevBatch',
254 : );
255 : } else {
256 2 : Logs().w(
257 1 : '[nav] we can still request timeline nextBatch: $type $newNextBatch',
258 : );
259 : }
260 : }
261 :
262 : final newEvents =
263 6 : resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
264 :
265 1 : if (!allowNewEvent) {
266 3 : if (resp.start == resp.end ||
267 2 : (resp.end == null && direction == Direction.f)) {
268 1 : allowNewEvent = true;
269 : }
270 :
271 1 : if (allowNewEvent) {
272 2 : Logs().d('We now allow sync update into the timeline.');
273 1 : newEvents.addAll(
274 5 : await room.client.database?.getEventList(room, onlySending: true) ??
275 0 : [],
276 : );
277 : }
278 : }
279 :
280 : // Try to decrypt encrypted events but don't update the database.
281 2 : if (room.encrypted && room.client.encryptionEnabled) {
282 0 : for (var i = 0; i < newEvents.length; i++) {
283 0 : if (newEvents[i].type == EventTypes.Encrypted) {
284 0 : newEvents[i] = await room.client.encryption!.decryptRoomEvent(
285 0 : newEvents[i],
286 : );
287 : }
288 : }
289 : }
290 :
291 : // update chunk anchors
292 1 : if (type == EventUpdateType.history) {
293 0 : chunk.prevBatch = newPrevBatch ?? '';
294 :
295 0 : final offset = chunk.events.length;
296 :
297 0 : chunk.events.addAll(newEvents);
298 :
299 0 : for (var i = 0; i < newEvents.length; i++) {
300 0 : onInsert?.call(i + offset);
301 : }
302 : } else {
303 2 : chunk.nextBatch = newNextBatch ?? '';
304 4 : chunk.events.insertAll(0, newEvents.reversed);
305 :
306 3 : for (var i = 0; i < newEvents.length; i++) {
307 2 : onInsert?.call(i);
308 : }
309 : }
310 :
311 1 : if (onUpdate != null) {
312 2 : onUpdate!();
313 : }
314 2 : return resp.chunk.length;
315 : }
316 :
317 9 : Timeline({
318 : required this.room,
319 : this.onUpdate,
320 : this.onChange,
321 : this.onInsert,
322 : this.onRemove,
323 : this.onNewEvent,
324 : required this.chunk,
325 : }) {
326 54 : timelineSub = room.client.onTimelineEvent.stream.listen(
327 14 : (event) => _handleEventUpdate(
328 : event,
329 : EventUpdateType.timeline,
330 : ),
331 : );
332 54 : historySub = room.client.onHistoryEvent.stream.listen(
333 6 : (event) => _handleEventUpdate(
334 : event,
335 : EventUpdateType.history,
336 : ),
337 : );
338 :
339 : // If the timeline is limited we want to clear our events cache
340 45 : roomSub = room.client.onSync.stream
341 53 : .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
342 18 : .listen(_removeEventsNotInThisSync);
343 :
344 9 : sessionIdReceivedSub =
345 45 : room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
346 9 : cancelSendEventSub =
347 54 : room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
348 :
349 : // we want to populate our aggregated events
350 16 : for (final e in events) {
351 7 : addAggregatedEvent(e);
352 : }
353 :
354 : // we are using a fragmented timeline
355 27 : if (chunk.nextBatch != '') {
356 1 : allowNewEvent = false;
357 1 : isFragmentedTimeline = true;
358 : // fragmented timelines never read from the database.
359 1 : _fetchedAllDatabaseEvents = true;
360 : }
361 : }
362 :
363 4 : void _cleanUpCancelledEvent(String eventId) {
364 4 : final i = _findEvent(event_id: eventId);
365 12 : if (i < events.length) {
366 12 : removeAggregatedEvent(events[i]);
367 8 : events.removeAt(i);
368 6 : onRemove?.call(i);
369 6 : onUpdate?.call();
370 : }
371 : }
372 :
373 : /// Removes all entries from [events] which are not in this SyncUpdate.
374 2 : void _removeEventsNotInThisSync(SyncUpdate sync) {
375 15 : final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
376 4 : final keepEventIds = newSyncEvents.map((e) => e.eventId);
377 7 : events.removeWhere((e) => !keepEventIds.contains(e.eventId));
378 : }
379 :
380 : /// Don't forget to call this before you dismiss this object!
381 0 : void cancelSubscriptions() {
382 : // ignore: discarded_futures
383 0 : timelineSub?.cancel();
384 : // ignore: discarded_futures
385 0 : historySub?.cancel();
386 : // ignore: discarded_futures
387 0 : roomSub?.cancel();
388 : // ignore: discarded_futures
389 0 : sessionIdReceivedSub?.cancel();
390 : // ignore: discarded_futures
391 0 : cancelSendEventSub?.cancel();
392 : }
393 :
394 2 : void _sessionKeyReceived(String sessionId) async {
395 : var decryptAtLeastOneEvent = false;
396 2 : Future<void> decryptFn() async {
397 6 : final encryption = room.client.encryption;
398 6 : if (!room.client.encryptionEnabled || encryption == null) {
399 : return;
400 : }
401 7 : for (var i = 0; i < events.length; i++) {
402 4 : if (events[i].type == EventTypes.Encrypted &&
403 4 : events[i].messageType == MessageTypes.BadEncrypted &&
404 0 : events[i].content['session_id'] == sessionId) {
405 0 : events[i] = await encryption.decryptRoomEvent(
406 0 : events[i],
407 : store: true,
408 : updateType: EventUpdateType.history,
409 : );
410 0 : addAggregatedEvent(events[i]);
411 0 : onChange?.call(i);
412 0 : if (events[i].type != EventTypes.Encrypted) {
413 : decryptAtLeastOneEvent = true;
414 : }
415 : }
416 : }
417 : }
418 :
419 6 : if (room.client.database != null) {
420 8 : await room.client.database?.transaction(decryptFn);
421 : } else {
422 0 : await decryptFn();
423 : }
424 0 : if (decryptAtLeastOneEvent) onUpdate?.call();
425 : }
426 :
427 : /// Request the keys for undecryptable events of this timeline
428 0 : void requestKeys({
429 : bool tryOnlineBackup = true,
430 : bool onlineKeyBackupOnly = true,
431 : }) {
432 0 : for (final event in events) {
433 0 : if (event.type == EventTypes.Encrypted &&
434 0 : event.messageType == MessageTypes.BadEncrypted &&
435 0 : event.content['can_request_session'] == true) {
436 0 : final sessionId = event.content.tryGet<String>('session_id');
437 0 : final senderKey = event.content.tryGet<String>('sender_key');
438 : if (sessionId != null && senderKey != null) {
439 0 : room.client.encryption?.keyManager.maybeAutoRequest(
440 0 : room.id,
441 : sessionId,
442 : senderKey,
443 : tryOnlineBackup: tryOnlineBackup,
444 : onlineKeyBackupOnly: onlineKeyBackupOnly,
445 : );
446 : }
447 : }
448 : }
449 : }
450 :
451 : /// Set the read marker to the last synced event in this timeline.
452 2 : Future<void> setReadMarker({String? eventId, bool? public}) async {
453 : eventId ??=
454 12 : events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
455 : if (eventId == null) return;
456 4 : return room.setReadMarker(eventId, mRead: eventId, public: public);
457 : }
458 :
459 7 : int _findEvent({String? event_id, String? unsigned_txid}) {
460 : // we want to find any existing event where either the passed event_id or the passed unsigned_txid
461 : // matches either the event_id or transaction_id of the existing event.
462 : // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
463 : // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
464 : // thus meaning we found our element.
465 : final searchNeedle = <String>{};
466 : if (event_id != null) {
467 7 : searchNeedle.add(event_id);
468 : }
469 : if (unsigned_txid != null) {
470 4 : searchNeedle.add(unsigned_txid);
471 : }
472 : int i;
473 28 : for (i = 0; i < events.length; i++) {
474 21 : final searchHaystack = <String>{events[i].eventId};
475 :
476 21 : final txnid = events[i].transactionId;
477 : if (txnid != null) {
478 4 : searchHaystack.add(txnid);
479 : }
480 14 : if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
481 : break;
482 : }
483 : }
484 : return i;
485 : }
486 :
487 4 : void _removeEventFromSet(Set<Event> eventSet, Event event) {
488 4 : eventSet.removeWhere(
489 4 : (e) =>
490 8 : e.matchesEventOrTransactionId(event.eventId) ||
491 4 : event.unsigned != null &&
492 8 : e.matchesEventOrTransactionId(event.transactionId),
493 : );
494 : }
495 :
496 9 : void addAggregatedEvent(Event event) {
497 : // we want to add an event to the aggregation tree
498 9 : final relationshipType = event.relationshipType;
499 9 : final relationshipEventId = event.relationshipEventId;
500 : if (relationshipType == null || relationshipEventId == null) {
501 : return; // nothing to do
502 : }
503 8 : final events = (aggregatedEvents[relationshipEventId] ??=
504 8 : <String, Set<Event>>{})[relationshipType] ??= <Event>{};
505 : // remove a potential old event
506 4 : _removeEventFromSet(events, event);
507 : // add the new one
508 4 : events.add(event);
509 4 : if (onChange != null) {
510 0 : final index = _findEvent(event_id: relationshipEventId);
511 0 : onChange?.call(index);
512 : }
513 : }
514 :
515 6 : void removeAggregatedEvent(Event event) {
516 18 : aggregatedEvents.remove(event.eventId);
517 6 : if (event.transactionId != null) {
518 6 : aggregatedEvents.remove(event.transactionId);
519 : }
520 16 : for (final types in aggregatedEvents.values) {
521 8 : for (final events in types.values) {
522 4 : _removeEventFromSet(events, event);
523 : }
524 : }
525 : }
526 :
527 7 : void _handleEventUpdate(
528 : Event event,
529 : EventUpdateType type, {
530 : bool update = true,
531 : }) {
532 : try {
533 28 : if (event.roomId != room.id) return;
534 :
535 10 : if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
536 : return;
537 : }
538 :
539 7 : if (type == EventUpdateType.timeline) {
540 7 : onNewEvent?.call();
541 : }
542 :
543 7 : if (!allowNewEvent) return;
544 :
545 7 : final status = event.status;
546 :
547 7 : final i = _findEvent(
548 7 : event_id: event.eventId,
549 7 : unsigned_txid: event.transactionId,
550 : );
551 :
552 21 : if (i < events.length) {
553 : // if the old status is larger than the new one, we also want to preserve the old status
554 21 : final oldStatus = events[i].status;
555 14 : events[i] = event;
556 : // do we preserve the status? we should allow 0 -> -1 updates and status increases
557 14 : if ((latestEventStatus(status, oldStatus) == oldStatus) &&
558 11 : !(status.isError && oldStatus.isSending)) {
559 21 : events[i].status = oldStatus;
560 : }
561 21 : addAggregatedEvent(events[i]);
562 9 : onChange?.call(i);
563 : } else {
564 6 : if (type == EventUpdateType.history &&
565 6 : events.indexWhere(
566 12 : (e) => e.eventId == event.eventId,
567 3 : ) !=
568 3 : -1) {
569 : return;
570 : }
571 12 : var index = events.length;
572 6 : if (type == EventUpdateType.history) {
573 6 : events.add(event);
574 : } else {
575 8 : index = events.firstIndexWhereNotError;
576 8 : events.insert(index, event);
577 : }
578 10 : onInsert?.call(index);
579 :
580 6 : addAggregatedEvent(event);
581 : }
582 :
583 : // Handle redaction events
584 14 : if (event.type == EventTypes.Redaction) {
585 6 : final index = _findEvent(event_id: event.redacts);
586 9 : if (index < events.length) {
587 9 : removeAggregatedEvent(events[index]);
588 :
589 : // Is the redacted event a reaction? Then update the event this
590 : // belongs to:
591 3 : if (onChange != null) {
592 3 : final relationshipEventId = events[index].relationshipEventId;
593 : if (relationshipEventId != null) {
594 0 : onChange?.call(_findEvent(event_id: relationshipEventId));
595 : return;
596 : }
597 : }
598 :
599 9 : events[index].setRedactionEvent(event);
600 4 : onChange?.call(index);
601 : }
602 : }
603 :
604 7 : if (update && !_collectHistoryUpdates) {
605 9 : onUpdate?.call();
606 : }
607 : } catch (e, s) {
608 0 : Logs().w('Handle event update failed', e, s);
609 : }
610 : }
611 :
612 0 : @Deprecated('Use [startSearch] instead.')
613 : Stream<List<Event>> searchEvent({
614 : String? searchTerm,
615 : int requestHistoryCount = 100,
616 : int maxHistoryRequests = 10,
617 : String? sinceEventId,
618 : int? limit,
619 : bool Function(Event)? searchFunc,
620 : }) =>
621 0 : startSearch(
622 : searchTerm: searchTerm,
623 : requestHistoryCount: requestHistoryCount,
624 : maxHistoryRequests: maxHistoryRequests,
625 : // ignore: deprecated_member_use_from_same_package
626 : sinceEventId: sinceEventId,
627 : limit: limit,
628 : searchFunc: searchFunc,
629 0 : ).map((result) => result.$1);
630 :
631 : /// Searches [searchTerm] in this timeline. It first searches in the
632 : /// cache, then in the database and then on the server. The search can
633 : /// take a while, which is why this returns a stream so the already found
634 : /// events can already be displayed.
635 : /// Override the [searchFunc] if you need another search. This will then
636 : /// ignore [searchTerm].
637 : /// Returns the List of Events and the next prevBatch at the end of the
638 : /// search.
639 0 : Stream<(List<Event>, String?)> startSearch({
640 : String? searchTerm,
641 : int requestHistoryCount = 100,
642 : int maxHistoryRequests = 10,
643 : String? prevBatch,
644 : @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
645 : int? limit,
646 : bool Function(Event)? searchFunc,
647 : }) async* {
648 0 : assert(searchTerm != null || searchFunc != null);
649 0 : searchFunc ??= (event) =>
650 0 : event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
651 0 : final found = <Event>[];
652 :
653 : if (sinceEventId == null) {
654 : // Search locally
655 0 : for (final event in events) {
656 0 : if (searchFunc(event)) {
657 0 : yield (found..add(event), null);
658 : }
659 : }
660 :
661 : // Search in database
662 0 : var start = events.length;
663 : while (true) {
664 0 : final eventsFromStore = await room.client.database?.getEventList(
665 0 : room,
666 : start: start,
667 : limit: requestHistoryCount,
668 : ) ??
669 0 : [];
670 0 : if (eventsFromStore.isEmpty) break;
671 0 : start += eventsFromStore.length;
672 0 : for (final event in eventsFromStore) {
673 0 : if (searchFunc(event)) {
674 0 : yield (found..add(event), null);
675 : }
676 : }
677 : }
678 : }
679 :
680 : // Search on the server
681 0 : prevBatch ??= room.prev_batch;
682 : if (sinceEventId != null) {
683 : prevBatch =
684 0 : (await room.client.getEventContext(room.id, sinceEventId)).end;
685 : }
686 0 : final encryption = room.client.encryption;
687 0 : for (var i = 0; i < maxHistoryRequests; i++) {
688 : if (prevBatch == null) break;
689 0 : if (limit != null && found.length >= limit) break;
690 : try {
691 0 : final resp = await room.client.getRoomEvents(
692 0 : room.id,
693 : Direction.b,
694 : from: prevBatch,
695 : limit: requestHistoryCount,
696 0 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
697 : );
698 0 : for (final matrixEvent in resp.chunk) {
699 0 : var event = Event.fromMatrixEvent(matrixEvent, room);
700 0 : if (event.type == EventTypes.Encrypted && encryption != null) {
701 0 : event = await encryption.decryptRoomEvent(event);
702 0 : if (event.type == EventTypes.Encrypted &&
703 0 : event.messageType == MessageTypes.BadEncrypted &&
704 0 : event.content['can_request_session'] == true) {
705 : // Await requestKey() here to ensure decrypted message bodies
706 0 : await event.requestKey();
707 : }
708 : }
709 0 : if (searchFunc(event)) {
710 0 : yield (found..add(event), resp.end);
711 0 : if (limit != null && found.length >= limit) break;
712 : }
713 : }
714 0 : prevBatch = resp.end;
715 : // We are at the beginning of the room
716 0 : if (resp.chunk.length < requestHistoryCount) break;
717 0 : } on MatrixException catch (e) {
718 : // We have no permission anymore to request the history
719 0 : if (e.error == MatrixError.M_FORBIDDEN) {
720 : break;
721 : }
722 : rethrow;
723 : }
724 : }
725 : return;
726 : }
727 : }
728 :
729 : extension on List<Event> {
730 4 : int get firstIndexWhereNotError {
731 4 : if (isEmpty) return 0;
732 16 : final index = indexWhere((event) => !event.status.isError);
733 9 : if (index == -1) return length;
734 : return index;
735 : }
736 : }
|