跳转到主要内容
数据源管理是 WuKongIM SDK 的核心功能之一,负责处理文件上传下载、会话同步、频道信息获取和消息同步等关键业务逻辑。

文件管理

监听上传附件

在自定义附件消息的时候发送给对方的消息是将网络地址发送给对方,并不是实际的文件。这个时候我们就需监听附件的上传。
WKIM.getInstance().getMsgManager().addOnUploadAttachListener(new IUploadAttachmentListener() {
    @Override
    public void onUploadAttachmentListener(WKMsg wkMsg, IUploadAttacResultListener listener) {
        // 在这里将未上传的文件上传到服务器并返回给sdk
        if(wkMsg.type == WKMsgContentType.WK_IMAGE){
            WKMediaMessageContent mediaMessageContent = (WKMediaMessageContent) wkMsg.baseContentMsgModel;
            if (TextUtils.isEmpty(mediaMessageContent.url)) {
                // todo 上传文件
                // ...
                mediaMessageContent.url = "xxxxxx"; // 设置网络地址并返回给sdk
                listener.onUploadResult(true, mediaMessageContent);
            }
        }
    }
});

监听下载附件

SDK 中不会主动下载消息的附件。在收到带有附件的消息时需要 app 自己按需下载。在 app 下载完成后可改文件本地地址,避免重复下载。
/**
 * 修改消息内容体
 *
 * @param clientMsgNo       客户端消息ID
 * @param messageContent    消息module 将本地地址保存在 messageContent 中
 * @param isRefreshUI       是否通知UI刷新对应消息
 */
WKIM.getInstance().getMsgManager().updateContent(String clientMsgNo, WKMessageContent messageContent, boolean isRefreshUI);

完整文件管理示例

public class FileManager {
    
    private static final String TAG = "FileManager";
    
    public void initialize() {
        // 设置文件上传监听器
        WKIM.getInstance().getMsgManager().addOnUploadAttachListener(this::handleFileUpload);
    }
    
    private void handleFileUpload(WKMsg wkMsg, IUploadAttacResultListener listener) {
        switch (wkMsg.type) {
            case WKMsgContentType.WK_IMAGE:
                uploadImage(wkMsg, listener);
                break;
            case WKMsgContentType.WK_VIDEO:
                uploadVideo(wkMsg, listener);
                break;
            case WKMsgContentType.WK_VOICE:
                uploadVoice(wkMsg, listener);
                break;
            case WKMsgContentType.WK_FILE:
                uploadFile(wkMsg, listener);
                break;
            default:
                listener.onUploadResult(false, null);
                break;
        }
    }
    
    private void uploadImage(WKMsg wkMsg, IUploadAttacResultListener listener) {
        WKImageContent imageContent = (WKImageContent) wkMsg.baseContentMsgModel;
        
        if (!TextUtils.isEmpty(imageContent.url)) {
            // 已经有网络地址,直接返回
            listener.onUploadResult(true, imageContent);
            return;
        }
        
        // 上传图片到服务器
        String localPath = imageContent.localPath;
        if (TextUtils.isEmpty(localPath)) {
            listener.onUploadResult(false, null);
            return;
        }
        
        // 异步上传
        uploadFileToServer(localPath, "image", new UploadCallback() {
            @Override
            public void onSuccess(String url) {
                imageContent.url = url;
                listener.onUploadResult(true, imageContent);
            }
            
            @Override
            public void onError(String error) {
                Log.e(TAG, "图片上传失败: " + error);
                listener.onUploadResult(false, null);
            }
        });
    }
    
    private void uploadVideo(WKMsg wkMsg, IUploadAttacResultListener listener) {
        WKVideoContent videoContent = (WKVideoContent) wkMsg.baseContentMsgModel;
        
        if (!TextUtils.isEmpty(videoContent.url)) {
            listener.onUploadResult(true, videoContent);
            return;
        }
        
        String localPath = videoContent.localPath;
        if (TextUtils.isEmpty(localPath)) {
            listener.onUploadResult(false, null);
            return;
        }
        
        // 上传视频和缩略图
        uploadFileToServer(localPath, "video", new UploadCallback() {
            @Override
            public void onSuccess(String url) {
                videoContent.url = url;
                
                // 如果有缩略图,也需要上传
                if (!TextUtils.isEmpty(videoContent.coverLocalPath)) {
                    uploadFileToServer(videoContent.coverLocalPath, "image", new UploadCallback() {
                        @Override
                        public void onSuccess(String coverUrl) {
                            videoContent.cover = coverUrl;
                            listener.onUploadResult(true, videoContent);
                        }
                        
                        @Override
                        public void onError(String error) {
                            // 缩略图上传失败,但视频上传成功
                            listener.onUploadResult(true, videoContent);
                        }
                    });
                } else {
                    listener.onUploadResult(true, videoContent);
                }
            }
            
            @Override
            public void onError(String error) {
                Log.e(TAG, "视频上传失败: " + error);
                listener.onUploadResult(false, null);
            }
        });
    }
    
    private void uploadVoice(WKMsg wkMsg, IUploadAttacResultListener listener) {
        WKVoiceContent voiceContent = (WKVoiceContent) wkMsg.baseContentMsgModel;
        
        if (!TextUtils.isEmpty(voiceContent.url)) {
            listener.onUploadResult(true, voiceContent);
            return;
        }
        
        String localPath = voiceContent.localPath;
        if (TextUtils.isEmpty(localPath)) {
            listener.onUploadResult(false, null);
            return;
        }
        
        uploadFileToServer(localPath, "voice", new UploadCallback() {
            @Override
            public void onSuccess(String url) {
                voiceContent.url = url;
                listener.onUploadResult(true, voiceContent);
            }
            
            @Override
            public void onError(String error) {
                Log.e(TAG, "语音上传失败: " + error);
                listener.onUploadResult(false, null);
            }
        });
    }
    
    private void uploadFile(WKMsg wkMsg, IUploadAttacResultListener listener) {
        WKFileContent fileContent = (WKFileContent) wkMsg.baseContentMsgModel;
        
        if (!TextUtils.isEmpty(fileContent.url)) {
            listener.onUploadResult(true, fileContent);
            return;
        }
        
        String localPath = fileContent.localPath;
        if (TextUtils.isEmpty(localPath)) {
            listener.onUploadResult(false, null);
            return;
        }
        
        uploadFileToServer(localPath, "file", new UploadCallback() {
            @Override
            public void onSuccess(String url) {
                fileContent.url = url;
                listener.onUploadResult(true, fileContent);
            }
            
            @Override
            public void onError(String error) {
                Log.e(TAG, "文件上传失败: " + error);
                listener.onUploadResult(false, null);
            }
        });
    }
    
    // 文件上传到服务器的具体实现
    private void uploadFileToServer(String localPath, String fileType, UploadCallback callback) {
        // 这里实现具体的文件上传逻辑
        // 可以使用 OkHttp、Retrofit 等网络库
        
        File file = new File(localPath);
        if (!file.exists()) {
            callback.onError("文件不存在");
            return;
        }
        
        // 示例:使用 OkHttp 上传文件
        RequestBody fileBody = RequestBody.create(MediaType.parse("application/octet-stream"), file);
        MultipartBody.Part filePart = MultipartBody.Part.createFormData("file", file.getName(), fileBody);
        
        ApiService.uploadFile(filePart)
                .enqueue(new Callback<UploadResponse>() {
                    @Override
                    public void onResponse(Call<UploadResponse> call, Response<UploadResponse> response) {
                        if (response.isSuccessful() && response.body() != null) {
                            callback.onSuccess(response.body().getUrl());
                        } else {
                            callback.onError("上传失败: " + response.message());
                        }
                    }
                    
                    @Override
                    public void onFailure(Call<UploadResponse> call, Throwable t) {
                        callback.onError("网络错误: " + t.getMessage());
                    }
                });
    }
    
    // 下载文件并更新消息内容
    public void downloadAndUpdateMessage(WKMsg message) {
        if (message.baseContentMsgModel instanceof WKMediaMessageContent) {
            WKMediaMessageContent mediaContent = (WKMediaMessageContent) message.baseContentMsgModel;
            
            if (!TextUtils.isEmpty(mediaContent.url) && TextUtils.isEmpty(mediaContent.localPath)) {
                downloadFile(mediaContent.url, new DownloadCallback() {
                    @Override
                    public void onSuccess(String localPath) {
                        mediaContent.localPath = localPath;
                        
                        // 更新消息内容
                        WKIM.getInstance().getMsgManager().updateContent(
                                message.clientMsgNO, 
                                mediaContent, 
                                true // 刷新UI
                        );
                    }
                    
                    @Override
                    public void onError(String error) {
                        Log.e(TAG, "文件下载失败: " + error);
                    }
                });
            }
        }
    }
    
    private void downloadFile(String url, DownloadCallback callback) {
        // 实现文件下载逻辑
        // 可以使用下载管理器或网络库
    }
    
    // 回调接口
    interface UploadCallback {
        void onSuccess(String url);
        void onError(String error);
    }
    
    interface DownloadCallback {
        void onSuccess(String localPath);
        void onError(String error);
    }
}

最近会话数据源

WKIM.getInstance().getConversationManager().addOnSyncConversationListener(new ISyncConversationChat() {
    @Override
    public void syncConversationChat(String last_msg_seqs, int msg_count, long version, ISyncConversationChatBack iSyncConversationChatBack) {
        /**
         * 同步会话
         *
         * @param last_msg_seqs     最近会话列表msg_seq集合
         * @param msg_count         会话里面消息同步数量
         * @param version           最大版本号
         * @param iSyncConvChatBack 回调
         */
        // 需要请求业务接口将数据返回给sdk
        syncConversationsFromServer(last_msg_seqs, msg_count, version, iSyncConversationChatBack);
    }
});

会话同步示例

public class ConversationDataSource {
    
    public void initialize() {
        WKIM.getInstance().getConversationManager().addOnSyncConversationListener(this::syncConversations);
    }
    
    private void syncConversations(String lastMsgSeqs, int msgCount, long version, ISyncConversationChatBack callback) {
        // 调用服务器API同步会话数据
        ApiService.syncConversations(lastMsgSeqs, msgCount, version)
                .enqueue(new Callback<ConversationSyncResponse>() {
                    @Override
                    public void onResponse(Call<ConversationSyncResponse> call, Response<ConversationSyncResponse> response) {
                        if (response.isSuccessful() && response.body() != null) {
                            ConversationSyncResponse syncResponse = response.body();
                            
                            // 将服务器返回的数据转换为SDK需要的格式
                            List<WKSyncConvMsg> conversations = convertToWKSyncConvMsg(syncResponse.getConversations());
                            
                            // 返回给SDK
                            callback.onResult(conversations);
                        } else {
                            callback.onResult(null);
                        }
                    }
                    
                    @Override
                    public void onFailure(Call<ConversationSyncResponse> call, Throwable t) {
                        Log.e("ConversationDataSource", "同步会话失败", t);
                        callback.onResult(null);
                    }
                });
    }
    
    private List<WKSyncConvMsg> convertToWKSyncConvMsg(List<ConversationData> serverData) {
        List<WKSyncConvMsg> result = new ArrayList<>();
        
        for (ConversationData data : serverData) {
            WKSyncConvMsg syncMsg = new WKSyncConvMsg();
            syncMsg.channelID = data.getChannelId();
            syncMsg.channelType = data.getChannelType();
            syncMsg.unreadCount = data.getUnreadCount();
            syncMsg.timestamp = data.getTimestamp();
            
            // 设置最后一条消息
            if (data.getLastMessage() != null) {
                syncMsg.lastMsg = convertToWKMsg(data.getLastMessage());
            }
            
            result.add(syncMsg);
        }
        
        return result;
    }
}

频道资料数据源

// 监听获取channel信息
WKIM.getInstance().getChannelManager().addOnGetChannelInfoListener(new IGetChannelInfo() {
    @Override
    public WKChannel onGetChannelInfo(String channelID, byte channelType, IChannelInfoListener iChannelInfoListener) {
        // 获取个人资料还是群资料可通过 channelType 区分
        // 如果app本地有该channel信息可直接返回数据,反之可获取网络数据后通过 iChannelInfoListener 返回
        return getChannelFromCache(channelID, channelType, iChannelInfoListener);
    }
});
SDK 内置频道类型可通过 WKChannelType 查看

批量保存频道资料

// 批量保存频道资料信息
WKIM.getInstance().getChannelManager().saveOrUpdateChannels(channels);

频道信息数据源示例

public class ChannelDataSource {
    
    private Map<String, WKChannel> channelCache = new ConcurrentHashMap<>();
    
    public void initialize() {
        WKIM.getInstance().getChannelManager().addOnGetChannelInfoListener(this::getChannelInfo);
    }
    
    private WKChannel getChannelInfo(String channelID, byte channelType, IChannelInfoListener listener) {
        String cacheKey = channelID + "_" + channelType;
        
        // 先从缓存获取
        WKChannel cachedChannel = channelCache.get(cacheKey);
        if (cachedChannel != null) {
            return cachedChannel;
        }
        
        // 从网络获取
        fetchChannelFromServer(channelID, channelType, new ChannelCallback() {
            @Override
            public void onSuccess(WKChannel channel) {
                // 缓存频道信息
                channelCache.put(cacheKey, channel);
                
                // 返回给SDK
                listener.onResult(channel);
            }
            
            @Override
            public void onError(String error) {
                Log.e("ChannelDataSource", "获取频道信息失败: " + error);
                listener.onResult(null);
            }
        });
        
        return null; // 异步获取,先返回null
    }
    
    private void fetchChannelFromServer(String channelID, byte channelType, ChannelCallback callback) {
        if (channelType == WKChannelType.PERSONAL) {
            // 获取用户信息
            ApiService.getUserInfo(channelID)
                    .enqueue(new Callback<UserInfo>() {
                        @Override
                        public void onResponse(Call<UserInfo> call, Response<UserInfo> response) {
                            if (response.isSuccessful() && response.body() != null) {
                                WKChannel channel = convertUserToChannel(response.body());
                                callback.onSuccess(channel);
                            } else {
                                callback.onError("获取用户信息失败");
                            }
                        }
                        
                        @Override
                        public void onFailure(Call<UserInfo> call, Throwable t) {
                            callback.onError(t.getMessage());
                        }
                    });
        } else if (channelType == WKChannelType.GROUP) {
            // 获取群组信息
            ApiService.getGroupInfo(channelID)
                    .enqueue(new Callback<GroupInfo>() {
                        @Override
                        public void onResponse(Call<GroupInfo> call, Response<GroupInfo> response) {
                            if (response.isSuccessful() && response.body() != null) {
                                WKChannel channel = convertGroupToChannel(response.body());
                                callback.onSuccess(channel);
                            } else {
                                callback.onError("获取群组信息失败");
                            }
                        }
                        
                        @Override
                        public void onFailure(Call<GroupInfo> call, Throwable t) {
                            callback.onError(t.getMessage());
                        }
                    });
        }
    }
    
    private WKChannel convertUserToChannel(UserInfo userInfo) {
        WKChannel channel = new WKChannel();
        channel.channelID = userInfo.getUserId();
        channel.channelType = WKChannelType.PERSONAL;
        channel.channelName = userInfo.getNickname();
        channel.avatar = userInfo.getAvatar();
        return channel;
    }
    
    private WKChannel convertGroupToChannel(GroupInfo groupInfo) {
        WKChannel channel = new WKChannel();
        channel.channelID = groupInfo.getGroupId();
        channel.channelType = WKChannelType.GROUP;
        channel.channelName = groupInfo.getGroupName();
        channel.avatar = groupInfo.getAvatar();
        return channel;
    }
    
    interface ChannelCallback {
        void onSuccess(WKChannel channel);
        void onError(String error);
    }
}

频道成员数据源

// 监听获取频道成员信息
WKIM.getInstance().getChannelMembersManager().addOnGetChannelMembersListener((channelID, channelType, keyword, page, limit, iChannelMemberListResult) -> {
    // 获取频道成员后通过 iChannelMembersListener 返回给sdk
    fetchChannelMembers(channelID, channelType, keyword, page, limit, iChannelMemberListResult);
});

频道消息数据源

WKIM.getInstance().getMsgManager().addOnSyncChannelMsgListener(new ISyncChannelMsgListener() {
    @Override
    public void syncChannelMsgs(String channelID, byte channelType, long startMessageSeq, long endMessageSeq, int limit, int pullMode, ISyncChannelMsgBack iSyncChannelMsgBack) {
        /**
         * 同步某个频道的消息
         *
         * @param channelID           频道ID
         * @param channelType         频道类型
         * @param startMessageSeq     开始消息列号(结果包含start_message_seq的消息)
         * @param endMessageSeq       结束消息列号(结果不包含end_message_seq的消息)
         * @param limit               消息数量限制
         * @param pullMode            拉取模式 0:向下拉取 1:向上拉取
         * @param iSyncChannelMsgBack 请求返回
         */
        syncMessagesFromServer(channelID, channelType, startMessageSeq, endMessageSeq, limit, pullMode, iSyncChannelMsgBack);
    }
});

消息同步示例

public class MessageDataSource {
    
    public void initialize() {
        WKIM.getInstance().getMsgManager().addOnSyncChannelMsgListener(this::syncChannelMessages);
    }
    
    private void syncChannelMessages(String channelID, byte channelType, long startMessageSeq, 
                                   long endMessageSeq, int limit, int pullMode, ISyncChannelMsgBack callback) {
        
        ApiService.syncChannelMessages(channelID, channelType, startMessageSeq, endMessageSeq, limit, pullMode)
                .enqueue(new Callback<MessageSyncResponse>() {
                    @Override
                    public void onResponse(Call<MessageSyncResponse> call, Response<MessageSyncResponse> response) {
                        if (response.isSuccessful() && response.body() != null) {
                            MessageSyncResponse syncResponse = response.body();
                            
                            // 转换消息格式
                            List<WKSyncRecvMsgModel> messages = convertToWKSyncRecvMsgModel(syncResponse.getMessages());
                            
                            // 返回给SDK
                            callback.onResult(messages);
                        } else {
                            callback.onResult(null);
                        }
                    }
                    
                    @Override
                    public void onFailure(Call<MessageSyncResponse> call, Throwable t) {
                        Log.e("MessageDataSource", "同步消息失败", t);
                        callback.onResult(null);
                    }
                });
    }
    
    private List<WKSyncRecvMsgModel> convertToWKSyncRecvMsgModel(List<MessageData> serverMessages) {
        List<WKSyncRecvMsgModel> result = new ArrayList<>();
        
        for (MessageData data : serverMessages) {
            WKSyncRecvMsgModel syncMsg = new WKSyncRecvMsgModel();
            syncMsg.channelID = data.getChannelId();
            syncMsg.channelType = data.getChannelType();
            syncMsg.messageID = data.getMessageId();
            syncMsg.messageSeq = data.getMessageSeq();
            syncMsg.fromUID = data.getFromUid();
            syncMsg.timestamp = data.getTimestamp();
            syncMsg.payload = data.getPayload();
            
            result.add(syncMsg);
        }
        
        return result;
    }
}

下一步