Files
aerc-fork-mirror/worker/jmap/fetch.go
Robin Jarry e324fa248f worker: add directory and filter metadata to response messages
Add Directory and Filter fields to DirectoryContents, DirectoryThreaded,
SearchResults, MessageInfo, and FullMessage response types. Workers now
populate these fields so the UI knows which directory and filter context
each response belongs to.

This metadata allows the message store to correctly associate incoming
messages with their source context, which is essential for the offline
worker to cache messages by directory and for proper handling of
concurrent operations across multiple folders.

Signed-off-by: Robin Jarry <robin@jarry.cc>
Reviewed-by: Simon Martin <simon@nasilyan.com>
2026-02-09 14:46:27 +01:00

257 lines
5.8 KiB
Go

package jmap
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"git.sr.ht/~rjarry/aerc/models"
"git.sr.ht/~rjarry/aerc/worker/types"
"git.sr.ht/~rockorager/go-jmap"
"git.sr.ht/~rockorager/go-jmap/mail/email"
"github.com/emersion/go-message/charset"
)
var bodyProperties = []string{
"blobId",
"charset",
"cid",
"disposition",
"language",
"location",
"name",
"partId",
"size",
"subParts",
"type",
}
var emailProperties = []string{
"id",
"blobId",
"threadId",
"mailboxIds",
"keywords",
"size",
"receivedAt",
"headers",
"messageId",
"inReplyTo",
"references",
"from",
"to",
"cc",
"bcc",
"replyTo",
"subject",
"bodyStructure",
}
func (w *JMAPWorker) getEmails(
ctx context.Context, uids []models.UID, props, bodyProps []string,
) ([]*email.Email, error) {
emailIdsToFetch := make([]jmap.ID, 0, len(uids))
currentEmails := make([]*email.Email, 0, len(uids))
for _, uid := range uids {
jid := jmap.ID(uid)
m, err := w.cache.GetEmail(jid)
if err != nil {
// Message wasn't in cache; fetch it
emailIdsToFetch = append(emailIdsToFetch, jid)
continue
}
currentEmails = append(currentEmails, m)
}
if len(emailIdsToFetch) > 0 {
var req jmap.Request
req.Invoke(&email.Get{
Account: w.AccountId(),
IDs: emailIdsToFetch,
Properties: props,
BodyProperties: bodyProps,
})
resp, err := w.Do(ctx, &req)
if err != nil {
return nil, err
}
for _, inv := range resp.Responses {
switch r := inv.Args.(type) {
case *email.GetResponse:
if err = w.cache.PutEmailState(r.State); err != nil {
w.w.Warnf("PutEmailState: %s", err)
}
for _, m := range r.List {
currentEmails = append(currentEmails, m)
if err := w.cache.PutEmail(m.ID, m); err != nil {
w.w.Warnf("PutEmail: %s", err)
}
}
case *jmap.MethodError:
return nil, wrapMethodError(r)
}
}
}
return currentEmails, nil
}
func (w *JMAPWorker) handleFetchMessageHeaders(msg *types.FetchMessageHeaders) error {
emails, err := w.getEmails(msg.Context(), msg.Uids, emailProperties, bodyProperties)
if err != nil {
return err
}
var threadsToFetch []jmap.ID
for _, eml := range emails {
w.w.PostMessage(&types.MessageInfo{
Message: types.RespondTo(msg),
Info: w.translateMsgInfo(eml, msg.Directory),
}, nil)
thread, err := w.cache.GetThread(eml.ThreadID)
if err != nil {
threadsToFetch = append(threadsToFetch, eml.ThreadID)
continue
}
for _, id := range thread {
m, err := w.cache.GetEmail(id)
if err != nil {
// This should never happen. If we have the
// thread in cache, we will have fetched it
// already or updated it from the update loop
w.w.Warnf("Email ID %s from Thread %s not in cache", id, eml.ThreadID)
continue
}
// Get the UI updated immediately
w.w.PostMessage(&types.MessageInfo{
Message: types.RespondTo(msg),
Info: w.translateMsgInfo(m, msg.Directory),
}, nil)
}
}
threadEmails, err := w.fetchEntireThreads(msg.Context(), threadsToFetch)
if err != nil {
return err
}
for _, m := range threadEmails {
w.w.PostMessage(&types.MessageInfo{
Message: types.RespondTo(msg),
Info: w.translateMsgInfo(m, msg.Directory),
}, nil)
if err := w.cache.PutEmail(m.ID, m); err != nil {
w.w.Warnf("PutEmail: %s", err)
}
}
return nil
}
func (w *JMAPWorker) handleFetchMessageBodyPart(msg *types.FetchMessageBodyPart) error {
mails, err := w.getEmails(msg.Context(), []models.UID{msg.Uid},
[]string{"id", "bodyStructure"}, bodyProperties)
if err != nil {
return err
}
if len(mails) != 1 {
return fmt.Errorf("bug: message %s not found", msg.Uid)
}
part := mails[0].BodyStructure
for i, index := range msg.Part {
index -= 1 // convert to zero based offset
if index < len(part.SubParts) {
part = part.SubParts[index]
} else {
return fmt.Errorf(
"bug: invalid part index[%d]: %v", i, msg.Part)
}
}
buf, err := w.cache.GetBlob(part.BlobID)
if err != nil {
rd, err := w.Download(msg.Context(), part.BlobID)
if err != nil {
return w.wrapDownloadError("part", part.BlobID, err)
}
buf, err = io.ReadAll(rd)
rd.Close()
if err != nil {
return err
}
if err = w.cache.PutBlob(part.BlobID, buf); err != nil {
w.w.Warnf("PutBlob: %s", err)
}
}
var reader io.Reader = bytes.NewReader(buf)
if strings.HasPrefix(part.Type, "text/") && part.Charset != "" {
r, err := charset.Reader(part.Charset, reader)
if err != nil {
w.w.Warnf("charset.Reader: %v", err)
} else {
reader = r
}
}
w.w.PostMessage(&types.MessageBodyPart{
Message: types.RespondTo(msg),
Part: &models.MessageBodyPart{
Reader: reader,
Uid: msg.Uid,
},
}, nil)
return nil
}
func (w *JMAPWorker) handleFetchFullMessages(msg *types.FetchFullMessages) error {
mails, err := w.getEmails(msg.Context(), msg.Uids, []string{"id", "blobId"}, nil)
if err != nil {
return err
}
for _, mail := range mails {
buf, err := w.cache.GetBlob(mail.BlobID)
if err != nil {
rd, err := w.Download(msg.Context(), mail.BlobID)
if err != nil {
return w.wrapDownloadError("full", mail.BlobID, err)
}
buf, err = io.ReadAll(rd)
rd.Close()
if err != nil {
return err
}
if err = w.cache.PutBlob(mail.BlobID, buf); err != nil {
w.w.Warnf("PutBlob: %s", err)
}
}
w.w.PostMessage(&types.FullMessage{
Message: types.RespondTo(msg),
Content: &models.FullMessage{
Reader: bytes.NewReader(buf),
Uid: models.UID(mail.ID),
},
}, nil)
}
return nil
}
func (w *JMAPWorker) wrapDownloadError(prefix string, blobId jmap.ID, err error) error {
urlRepl := strings.NewReplacer(
"{accountId}", string(w.AccountId()),
"{blobId}", string(blobId),
"{type}", "application/octet-stream",
"{name}", "filename",
)
url := urlRepl.Replace(w.client.Session.DownloadURL)
return fmt.Errorf("%s: %q %w", prefix, url, err)
}