Skip to content

Commit

Permalink
fix cache locked issue on read error, fix iter(dialogs|messages|histo…
Browse files Browse the repository at this point in the history
…ry) small bug, add IterChatMembers, fix GetChatMembers, v1.5.0 (2024 Last Update)
  • Loading branch information
AmarnathCJD committed Dec 27, 2024
1 parent 78fef3a commit fea2016
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 99 deletions.
3 changes: 3 additions & 0 deletions telegram/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,19 @@ func (c *CACHE) ReadFile() {
break
}
c.logger.Error("error reading from cache file: ", err)
c.Unlock()
return
}

var id, accessHash int64
if err := binary.Read(file, binary.BigEndian, &id); err != nil {
c.logger.Error("cache file corrupted: ", err)
c.Unlock()
return
}
if err := binary.Read(file, binary.BigEndian, &accessHash); err != nil {
c.logger.Error("cache file corrupted: ", err)
c.Unlock()
return
}

Expand Down
284 changes: 231 additions & 53 deletions telegram/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package telegram

import (
"math"
"reflect"
"time"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -196,10 +198,11 @@ func (c *Client) GetChatMember(chatID, userID any) (*Participant, error) {
}

type ParticipantOptions struct {
Query string `json:"query,omitempty"`
Filter ChannelParticipantsFilter `json:"filter,omitempty"`
Offset int32 `json:"offset,omitempty"`
Limit int32 `json:"limit,omitempty"`
Query string `json:"query,omitempty"`
Filter ChannelParticipantsFilter `json:"filter,omitempty"`
Offset int32 `json:"offset,omitempty"`
Limit int32 `json:"limit,omitempty"`
SleepThresholdMs int32 `json:"sleep_threshold_ms,omitempty"`
}

// GetChatMembers returns the members of a chat
Expand All @@ -221,61 +224,236 @@ func (c *Client) GetChatMembers(chatID any, Opts ...*ParticipantOptions) ([]*Par
opts := getVariadic(Opts, &ParticipantOptions{Filter: &ChannelParticipantsSearch{}, Limit: 1})
if opts.Query != "" {
opts.Filter = &ChannelParticipantsSearch{Q: opts.Query}
} else if opts.Filter == nil {
opts.Filter = &ChannelParticipantsSearch{}
}
participants, err := c.ChannelsGetParticipants(&InputChannelObj{ChannelID: chat.ChannelID, AccessHash: chat.AccessHash}, opts.Filter, opts.Offset, opts.Limit, 0)

var fetched int32 = 0
var participantsList []*Participant
var reqLimit, reqOffset int32 = 200, opts.Offset
var totalCount int32

for {
remaning := opts.Limit - int32(fetched)
reqLimit = min(remaning, 200)

participants, err := c.ChannelsGetParticipants(&InputChannelObj{ChannelID: chat.ChannelID, AccessHash: chat.AccessHash}, opts.Filter, reqOffset, reqLimit, 0)
if err != nil {
return nil, 0, err
}
cParts, ok := participants.(*ChannelsChannelParticipantsObj)
if opts.Limit == -1 {
opts.Limit = cParts.Count
continue
}

if !ok {
return nil, 0, errors.New("could not get participants")
}
c.Cache.UpdatePeersToCache(cParts.Users, cParts.Chats)
var (
status string = Member
rights *ChatAdminRights = &ChatAdminRights{}
rank string = ""
UserID int64 = 0
)
for _, p := range cParts.Participants {
switch p := p.(type) {
case *ChannelParticipantCreator:
status = Creator
rights = p.AdminRights
rank = p.Rank
UserID = p.UserID
case *ChannelParticipantAdmin:
status = Admin
rights = p.AdminRights
rank = p.Rank
UserID = p.UserID
case *ChannelParticipantObj:
status = Member
UserID = p.UserID
case *ChannelParticipantSelf:
status = Member
UserID = p.UserID
case *ChannelParticipantBanned:
status = Restricted
UserID = c.GetPeerID(p.Peer)
case *ChannelParticipantLeft:
status = Left
UserID = c.GetPeerID(p.Peer)
}
partUser, err := c.GetUser(UserID)
if err != nil {
return nil, 0, err
}
participantsList = append(participantsList, &Participant{
User: partUser,
Participant: p,
Status: status,
Rights: rights,
Rank: rank,
})

fetched++
}

if fetched >= opts.Limit || len(cParts.Participants) == 0 {
break
}

reqOffset = fetched
totalCount = cParts.Count

time.Sleep(time.Duration(opts.SleepThresholdMs) * time.Millisecond)
}
return participantsList, totalCount, nil
}

func (c *Client) IterChatMembers(chatID any, Opts ...*ParticipantOptions) (<-chan *Participant, <-chan error) {
ch := make(chan *Participant)
errCh := make(chan error)

var peerToAct, err = c.ResolvePeer(chatID)
if err != nil {
return nil, 0, err
errCh <- err
close(ch)
return ch, errCh
}
cParts, ok := participants.(*ChannelsChannelParticipantsObj)

var chat, ok = peerToAct.(*InputPeerChannel)
if !ok {
return nil, 0, errors.New("could not get participants")
errCh <- errors.New("peer is not a channel")
close(ch)
return ch, errCh
}
c.Cache.UpdatePeersToCache(cParts.Users, cParts.Chats)
var (
status string = Member
rights *ChatAdminRights = &ChatAdminRights{}
rank string = ""
UserID int64 = 0
)
participantsList := make([]*Participant, 0)
for _, p := range cParts.Participants {
switch p := p.(type) {
case *ChannelParticipantCreator:
status = Creator
rights = p.AdminRights
rank = p.Rank
UserID = p.UserID
case *ChannelParticipantAdmin:
status = Admin
rights = p.AdminRights
rank = p.Rank
UserID = p.UserID
case *ChannelParticipantObj:
status = Member
UserID = p.UserID
case *ChannelParticipantSelf:
status = Member
UserID = p.UserID
case *ChannelParticipantBanned:
status = Restricted
UserID = c.GetPeerID(p.Peer)
case *ChannelParticipantLeft:
status = Left
UserID = c.GetPeerID(p.Peer)
}
partUser, err := c.GetUser(UserID)
if err != nil {
return nil, 0, err
}
participantsList = append(participantsList, &Participant{
User: partUser,
Participant: p,
Status: status,
Rights: rights,
Rank: rank,

go func() {
defer close(ch)
defer close(errCh)

var opts = getVariadic(Opts, &ParticipantOptions{
Limit: 1,
SleepThresholdMs: 20,
Filter: &ChannelParticipantsSearch{},
})
}
return participantsList, cParts.Count, nil

if opts.Query != "" {
opts.Filter = &ChannelParticipantsSearch{Q: opts.Query}
} else if opts.Filter == nil {
opts.Filter = &ChannelParticipantsSearch{}
}

var fetched int32 = 0
req := &ChannelsGetParticipantsParams{
Channel: &InputChannelObj{ChannelID: chat.ChannelID, AccessHash: chat.AccessHash},
Filter: opts.Filter,
Offset: opts.Offset,
Limit: 200,
Hash: 0,
}

for {
if opts.Limit == -1 {
req.Limit = 0
resp, err := c.MakeRequest(req)
if err != nil {
errCh <- err
return
}

switch resp := resp.(type) {
case *ChannelsChannelParticipantsObj:
if resp.Count == 0 {
return
}
opts.Limit = resp.Count
case *ChannelsChannelParticipantsNotModified:
default:
}

continue
}

remaining := opts.Limit - int32(fetched)
perReqLimit := int32(200)
if remaining < perReqLimit {
perReqLimit = remaining
}
req.Limit = perReqLimit

resp, err := c.MakeRequest(req)
if err != nil {
if handleIfFlood(err, c) {
continue
}
errCh <- err
return
}

switch resp := resp.(type) {
case *ChannelsChannelParticipantsObj:
c.Cache.UpdatePeersToCache(resp.Users, resp.Chats)
for _, participant := range resp.Participants {
var (
status string = Member
rights *ChatAdminRights = &ChatAdminRights{}
rank string = ""
UserID int64 = 0
)
switch p := participant.(type) {
case *ChannelParticipantCreator:
status = Creator
rights = p.AdminRights
rank = p.Rank
UserID = p.UserID
case *ChannelParticipantAdmin:
status = Admin
rights = p.AdminRights
rank = p.Rank
UserID = p.UserID
case *ChannelParticipantObj:
status = Member
UserID = p.UserID
case *ChannelParticipantSelf:
status = Member
UserID = p.UserID
case *ChannelParticipantBanned:
status = Restricted
UserID = c.GetPeerID(p.Peer)
case *ChannelParticipantLeft:
status = Left
UserID = c.GetPeerID(p.Peer)
}
partUser, err := c.GetUser(UserID)
if err != nil {
errCh <- err
return
}
ch <- &Participant{
User: partUser,
Participant: participant,
Status: status,
Rights: rights,
Rank: rank,
}

fetched++
}
if len(resp.Participants) < int(perReqLimit) || fetched >= opts.Limit && opts.Limit > 0 {
return
}

req.Offset = fetched
default:
errCh <- errors.New("unexpected response: " + reflect.TypeOf(resp).String())
return
}

time.Sleep(time.Duration(opts.SleepThresholdMs) * time.Millisecond)
}
}()

return ch, errCh
}

type AdminOptions struct {
Expand Down
2 changes: 1 addition & 1 deletion telegram/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

const (
ApiVersion = 195
Version = "v1.4.8"
Version = "v1.5.0"

LogDebug = utils.DebugLevel
LogInfo = utils.InfoLevel
Expand Down
Loading

0 comments on commit fea2016

Please sign in to comment.