跳转到主要内容
数据源管理是 WuKongIM HarmonyOS SDK 的核心功能之一,负责处理文件上传下载、会话同步、频道信息获取和消息同步等关键业务逻辑。

文件管理

在自定义附件消息的时候发送给对方的消息是将网络地址发送给对方,并不是实际的文件。这个时候我们就需监听附件的上传。

文件上传

// 定义上传文件数据源
let uploadAttachmentCallback = async (msg: WKMsg): Promise<[boolean, WKMsg]> => {
  if (msg.contentType === WKMsgContentType.Image) {
    // 上传图片
    let imageContent = msg.messageContent as WKImageContent;
    imageContent.url = 'xxxx';
    msg.messageContent = imageContent;
    return [true, msg];
  } else if (msg.contentType === WKMsgContentType.Voice) {
    // 上传语音
    let voiceContent = msg.messageContent as WKVoiceContent;
    voiceContent.url = 'xxxx';
    msg.messageContent = voiceContent;
    return [true, msg];
  } else if (msg.contentType === WKMsgContentType.Video) {
    // 上传视频
    let videoContent = msg.messageContent as WKVideoContent;
    videoContent.url = 'xxxx';
    msg.messageContent = videoContent;
    return [true, msg];
  } else if (msg.contentType === 17) {
    // 上传自定义附件消息
    let customerMsg = msg.messageContent as LocationMessageContent;
    customerMsg.url = 'https://img1.baidu.com/it/u=3362698628,1928330748&fm=253&fmt=auto&app=138&f=JPEG?w=390&h=308';
    msg.messageContent = customerMsg;
    return [true, msg];
  }
  return [true, msg];
};

// 提供文件上传数据源
WKIM.shared.config.provider.uploadAttachmentCallback = uploadAttachmentCallback;

完整文件上传管理示例

import { WKIM, WKMsg, WKMsgContentType, WKImageContent, WKVoiceContent, WKVideoContent, WKFileContent } from '@wukong/wkim';
import { http } from '@kit.NetworkKit';
import { fileIo } from '@kit.CoreFileKit';

class FileUploadManager {
  
  static setupUploadProvider(): void {
    WKIM.shared.config.provider.uploadAttachmentCallback = async (msg: WKMsg): Promise<[boolean, WKMsg]> => {
      try {
        switch (msg.contentType) {
          case WKMsgContentType.Image:
            return await this.uploadImage(msg);
          case WKMsgContentType.Voice:
            return await this.uploadVoice(msg);
          case WKMsgContentType.Video:
            return await this.uploadVideo(msg);
          case WKMsgContentType.File:
            return await this.uploadFile(msg);
          default:
            return [true, msg];
        }
      } catch (error) {
        console.error('文件上传失败:', error);
        return [false, msg];
      }
    };
  }
  
  // 上传图片
  private static async uploadImage(msg: WKMsg): Promise<[boolean, WKMsg]> {
    const imageContent = msg.messageContent as WKImageContent;
    
    if (!imageContent.localPath) {
      return [true, msg];
    }
    
    try {
      // 压缩图片
      const compressedPath = await this.compressImage(imageContent.localPath);
      
      // 上传到服务器
      const uploadResult = await this.uploadToServer(compressedPath, 'image');
      
      // 更新消息内容
      imageContent.url = uploadResult.url;
      imageContent.size = uploadResult.size;
      msg.messageContent = imageContent;
      
      console.log('图片上传成功:', imageContent.url);
      return [true, msg];
      
    } catch (error) {
      console.error('图片上传失败:', error);
      return [false, msg];
    }
  }
  
  // 上传语音
  private static async uploadVoice(msg: WKMsg): Promise<[boolean, WKMsg]> {
    const voiceContent = msg.messageContent as WKVoiceContent;
    
    if (!voiceContent.localPath) {
      return [true, msg];
    }
    
    try {
      // 上传语音文件
      const uploadResult = await this.uploadToServer(voiceContent.localPath, 'voice');
      
      // 更新消息内容
      voiceContent.url = uploadResult.url;
      voiceContent.size = uploadResult.size;
      msg.messageContent = voiceContent;
      
      console.log('语音上传成功:', voiceContent.url);
      return [true, msg];
      
    } catch (error) {
      console.error('语音上传失败:', error);
      return [false, msg];
    }
  }
  
  // 上传视频
  private static async uploadVideo(msg: WKMsg): Promise<[boolean, WKMsg]> {
    const videoContent = msg.messageContent as WKVideoContent;
    
    if (!videoContent.localPath) {
      return [true, msg];
    }
    
    try {
      // 生成视频封面
      const coverPath = await this.generateVideoCover(videoContent.localPath);
      
      // 上传封面
      const coverResult = await this.uploadToServer(coverPath, 'image');
      
      // 上传视频
      const videoResult = await this.uploadToServer(videoContent.localPath, 'video');
      
      // 更新消息内容
      videoContent.cover = coverResult.url;
      videoContent.url = videoResult.url;
      videoContent.size = videoResult.size;
      msg.messageContent = videoContent;
      
      console.log('视频上传成功:', videoContent.url);
      return [true, msg];
      
    } catch (error) {
      console.error('视频上传失败:', error);
      return [false, msg];
    }
  }
  
  // 上传文件
  private static async uploadFile(msg: WKMsg): Promise<[boolean, WKMsg]> {
    const fileContent = msg.messageContent as WKFileContent;
    
    if (!fileContent.localPath) {
      return [true, msg];
    }
    
    try {
      // 上传文件
      const uploadResult = await this.uploadToServer(fileContent.localPath, 'file');
      
      // 更新消息内容
      fileContent.url = uploadResult.url;
      fileContent.size = uploadResult.size;
      msg.messageContent = fileContent;
      
      console.log('文件上传成功:', fileContent.url);
      return [true, msg];
      
    } catch (error) {
      console.error('文件上传失败:', error);
      return [false, msg];
    }
  }
  
  // 压缩图片
  private static async compressImage(imagePath: string): Promise<string> {
    // 实现图片压缩逻辑
    // 可以使用 HarmonyOS 的图片处理API
    return imagePath; // 示例返回原路径
  }
  
  // 生成视频封面
  private static async generateVideoCover(videoPath: string): Promise<string> {
    // 实现视频封面生成逻辑
    // 可以使用 HarmonyOS 的媒体处理API
    return ''; // 示例返回空字符串
  }
  
  // 上传到服务器
  private static async uploadToServer(filePath: string, type: string): Promise<{url: string, size: number}> {
    try {
      // 读取文件
      const file = fileIo.openSync(filePath, fileIo.OpenMode.READ_ONLY);
      const fileSize = fileIo.statSync(filePath).size;
      
      // 创建HTTP请求
      const httpRequest = http.createHttp();
      
      // 构建表单数据
      const formData = new FormData();
      formData.append('file', file);
      formData.append('type', type);
      
      const response = await httpRequest.request('https://your-upload-server.com/upload', {
        method: http.RequestMethod.POST,
        header: {
          'Authorization': `Bearer ${this.getAuthToken()}`,
        },
        extraData: formData
      });
      
      if (response.responseCode === 200) {
        const data = JSON.parse(response.result as string);
        return {
          url: data.url,
          size: fileSize
        };
      } else {
        throw new Error(`上传失败: ${response.responseCode}`);
      }
    } catch (error) {
      throw new Error(`网络请求失败: ${error}`);
    }
  }
  
  private static getAuthToken(): string {
    // 获取认证令牌
    return 'your-auth-token';
  }
}

文件下载

SDK 中不会主动下载消息的附件。在收到带有附件的消息时需要应用自己按需下载。在应用下载完成后可改文件本地地址,避免重复下载。
WKIM.shared.messageManager().updateContent(clientMsgNo: string, messageContent: WKMessageContent);

完整文件下载管理示例

class FileDownloadManager {
  private static downloadCache: Map<string, string> = new Map();
  
  // 下载文件
  static async downloadFile(url: string, fileName: string, onProgress?: (progress: number) => void): Promise<string | null> {
    // 检查缓存
    if (this.downloadCache.has(url)) {
      const cachedPath = this.downloadCache.get(url)!;
      if (await this.fileExists(cachedPath)) {
        return cachedPath;
      }
    }
    
    try {
      // 获取下载目录
      const downloadDir = await this.getDownloadDirectory();
      const filePath = `${downloadDir}/${fileName}`;
      
      // 下载文件
      const httpRequest = http.createHttp();
      const response = await httpRequest.request(url, {
        method: http.RequestMethod.GET,
      });
      
      if (response.responseCode === 200) {
        // 保存文件
        const file = fileIo.openSync(filePath, fileIo.OpenMode.WRITE_ONLY | fileIo.OpenMode.CREATE);
        fileIo.writeSync(file.fd, response.result as ArrayBuffer);
        fileIo.closeSync(file);
        
        // 缓存路径
        this.downloadCache.set(url, filePath);
        
        console.log('文件下载成功:', filePath);
        return filePath;
      } else {
        throw new Error(`下载失败: ${response.responseCode}`);
      }
    } catch (error) {
      console.error('文件下载失败:', error);
      return null;
    }
  }
  
  // 下载并更新消息内容
  static async downloadAndUpdateMessage(message: WKMsg): Promise<void> {
    const content = message.messageContent;
    if (!content) return;
    
    let localPath: string | null = null;
    let fileName = '';
    
    switch (message.contentType) {
      case WKMsgContentType.Image:
        const imageContent = content as WKImageContent;
        if (imageContent.url && !imageContent.localPath) {
          fileName = `image_${message.clientMsgNo}.jpg`;
          localPath = await this.downloadFile(imageContent.url, fileName);
          if (localPath) {
            imageContent.localPath = localPath;
          }
        }
        break;
        
      case WKMsgContentType.Voice:
        const voiceContent = content as WKVoiceContent;
        if (voiceContent.url && !voiceContent.localPath) {
          fileName = `voice_${message.clientMsgNo}.mp3`;
          localPath = await this.downloadFile(voiceContent.url, fileName);
          if (localPath) {
            voiceContent.localPath = localPath;
          }
        }
        break;
        
      case WKMsgContentType.Video:
        const videoContent = content as WKVideoContent;
        if (videoContent.url && !videoContent.localPath) {
          fileName = `video_${message.clientMsgNo}.mp4`;
          localPath = await this.downloadFile(videoContent.url, fileName);
          if (localPath) {
            videoContent.localPath = localPath;
          }
        }
        break;
        
      case WKMsgContentType.File:
        const fileContent = content as WKFileContent;
        if (fileContent.url && !fileContent.localPath) {
          fileName = fileContent.name || `file_${message.clientMsgNo}`;
          localPath = await this.downloadFile(fileContent.url, fileName);
          if (localPath) {
            fileContent.localPath = localPath;
          }
        }
        break;
    }
    
    // 更新消息内容
    if (localPath) {
      WKIM.shared.messageManager().updateContent(message.clientMsgNo, content);
    }
  }
  
  // 检查文件是否存在
  private static async fileExists(filePath: string): Promise<boolean> {
    try {
      fileIo.accessSync(filePath);
      return true;
    } catch {
      return false;
    }
  }
  
  // 获取下载目录
  private static async getDownloadDirectory(): Promise<string> {
    // 获取应用缓存目录
    const cacheDir = globalThis.getContext().cacheDir;
    const downloadDir = `${cacheDir}/downloads`;
    
    // 创建目录
    try {
      fileIo.mkdirSync(downloadDir);
    } catch {
      // 目录已存在
    }
    
    return downloadDir;
  }
  
  // 清理下载缓存
  static clearDownloadCache(): void {
    this.downloadCache.clear();
  }
  
  // 获取缓存大小
  static async getCacheSize(): Promise<number> {
    let totalSize = 0;
    
    for (const filePath of this.downloadCache.values()) {
      try {
        const stat = fileIo.statSync(filePath);
        totalSize += stat.size;
      } catch {
        // 文件不存在
      }
    }
    
    return totalSize;
  }
}

最近会话数据源

同步最近会话数据源

// 定义提供者
let syncConversationCallback = async (lastMsgSeqs: string, msgCount: number, version: number): Promise<WKSyncConversation> => {
  // do 请求接口后返回给sdk
};

// 设置同步最近会话提供者
WKIM.shared.config.provider.syncConversationCallback = syncConversationCallback;

完整会话同步实现

class ConversationDataSource {
  
  static setupConversationSync(): void {
    WKIM.shared.config.provider.syncConversationCallback = async (
      lastMsgSeqs: string, 
      msgCount: number, 
      version: number
    ): Promise<WKSyncConversation> => {
      try {
        // 从服务器同步会话数据
        const conversations = await this.syncConversationsFromServer(lastMsgSeqs, msgCount, version);
        
        console.log('会话同步成功,获取到', conversations.length, '个会话');
        
        return {
          conversations: conversations,
          version: version
        };
        
      } catch (error) {
        console.error('会话同步失败:', error);
        return {
          conversations: [],
          version: version
        };
      }
    };
  }
  
  // 从服务器同步会话
  private static async syncConversationsFromServer(
    lastMsgSeqs: string, 
    msgCount: number, 
    version: number
  ): Promise<WKConversation[]> {
    try {
      const httpRequest = http.createHttp();
      const response = await httpRequest.request('https://your-api.com/conversations/sync', {
        method: http.RequestMethod.POST,
        header: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${this.getAuthToken()}`,
        },
        extraData: JSON.stringify({
          last_msg_seqs: lastMsgSeqs,
          msg_count: msgCount,
          version: version,
        })
      });
      
      if (response.responseCode === 200) {
        const data = JSON.parse(response.result as string);
        return this.parseConversations(data.conversations);
      } else {
        throw new Error(`同步会话失败: ${response.responseCode}`);
      }
    } catch (error) {
      throw new Error(`网络请求失败: ${error}`);
    }
  }
  
  // 解析会话数据
  private static parseConversations(data: any[]): WKConversation[] {
    return data.map(item => {
      const conversation = new WKConversation();
      conversation.channelId = item.channel_id;
      conversation.channelType = item.channel_type;
      conversation.lastClientMsgNo = item.last_client_msg_no;
      conversation.version = item.version;
      conversation.lastMsgTimestamp = item.last_msg_timestamp;
      conversation.unreadCount = item.unread_count;
      conversation.lastMsgSeq = item.last_msg_seq;
      return conversation;
    });
  }
  
  private static getAuthToken(): string {
    return 'your-auth-token';
  }
}

频道资料数据源

// 设置频道资料提供者
WKIM.shared.config.provider.channelInfoCallback =
  async (channelId: string, channelType: number): Promise<WKChannel> => {
    // 测试数据,实际可通过接口返回
    WKLogger.error('获取channel资料', channelId, channelType + "");
    let channel = new WKChannel(channelId, channelType);
    if (channel.channelType === WKChannelType.personal) {
      channel.channelName = `单聊${channelId}`;
      channel.channelRemark = `备注${channel.channelName}`;
    } else if (channel.channelType === WKChannelType.group) {
      channel.channelName = `群${channelId}`;
    }

    channel.avatar = `https://api.multiavatar.com/${channel.channelId}.png`;
    return channel;
  };

完整频道资料数据源实现

class ChannelDataSource {
  
  static setupChannelDataSource(): void {
    WKIM.shared.config.provider.channelInfoCallback = async (
      channelId: string, 
      channelType: number
    ): Promise<WKChannel> => {
      try {
        // 从服务器获取频道信息
        const channel = await this.getChannelFromServer(channelId, channelType);
        
        if (channel) {
          console.log('获取频道信息成功:', channelId);
          return channel;
        } else {
          // 返回默认频道信息
          return this.createDefaultChannel(channelId, channelType);
        }
        
      } catch (error) {
        console.error('获取频道信息失败:', error);
        // 返回默认频道信息
        return this.createDefaultChannel(channelId, channelType);
      }
    };
  }
  
  // 从服务器获取频道信息
  private static async getChannelFromServer(channelId: string, channelType: number): Promise<WKChannel | null> {
    try {
      const endpoint = channelType === WKChannelType.personal 
        ? `/api/users/${channelId}` 
        : `/api/groups/${channelId}`;
        
      const httpRequest = http.createHttp();
      const response = await httpRequest.request(`https://your-api.com${endpoint}`, {
        method: http.RequestMethod.GET,
        header: {
          'Authorization': `Bearer ${this.getAuthToken()}`,
        }
      });
      
      if (response.responseCode === 200) {
        const data = JSON.parse(response.result as string);
        return this.parseChannel(data, channelId, channelType);
      } else if (response.responseCode === 404) {
        return null;
      } else {
        throw new Error(`获取频道信息失败: ${response.responseCode}`);
      }
    } catch (error) {
      throw new Error(`网络请求失败: ${error}`);
    }
  }
  
  // 解析频道数据
  private static parseChannel(data: any, channelId: string, channelType: number): WKChannel {
    const channel = new WKChannel(channelId, channelType);
    channel.channelName = data.name || data.nickname || '';
    channel.channelRemark = data.remark || '';
    channel.avatar = data.avatar || '';
    channel.top = data.top || 0;
    channel.mute = data.mute || 0;
    channel.follow = data.follow || 0;
    channel.online = data.online || 0;
    channel.lastOffline = data.last_offline || 0;
    channel.status = data.status || 1;
    channel.version = data.version || 0;
    return channel;
  }
  
  // 创建默认频道信息
  private static createDefaultChannel(channelId: string, channelType: number): WKChannel {
    const channel = new WKChannel(channelId, channelType);
    
    if (channelType === WKChannelType.personal) {
      channel.channelName = `用户${channelId}`;
      channel.channelRemark = `备注${channel.channelName}`;
    } else if (channelType === WKChannelType.group) {
      channel.channelName = `群${channelId}`;
    }
    
    channel.avatar = `https://api.multiavatar.com/${channelId}.png`;
    return channel;
  }
  
  private static getAuthToken(): string {
    return 'your-auth-token';
  }
}

频道成员数据源

频道成员分页数据源

// 定义提供者
WKIM.shared.config.provider.channelMemberWithPageCallback = async (
  channel: WKChannel,
  option: SyncChannelMemberOptions
): Promise<WKChannelMember[]> => {
  // todo 请求接口后返回给sdk
  let list: WKChannelMember[] = [];
  return list;
};

频道消息数据源

// 定义提供者
let syncMessageCallback = async (channel: WKChannel, options: SyncOptions): Promise<WKSyncChannelMsg> => {
  /*
   * 同步某个频道的消息
   *
   * @param channel.channelId           频道ID
   * @param channel.channelType         频道类型
   * @param options.startMessageSeq     开始消息列号(结果包含start_message_seq的消息)
   * @param options.endMessageSeq       结束消息列号(结果不包含end_message_seq的消息)
   * @param options.limit               消息数量限制
   * @param options.pullMode            拉取模式 0:向下拉取 1:向上拉取
   */
  // todo 请求接口后需返回给sdk
};

// 同步channel消息
WKIM.shared.config.provider.syncMessageCallback = syncMessageCallback;

完整消息同步实现

class MessageDataSource {
  
  static setupMessageSync(): void {
    WKIM.shared.config.provider.syncMessageCallback = async (
      channel: WKChannel, 
      options: SyncOptions
    ): Promise<WKSyncChannelMsg> => {
      try {
        // 从服务器同步消息
        const messages = await this.syncMessagesFromServer(channel, options);
        
        console.log('消息同步成功,获取到', messages.length, '条消息');
        
        return {
          messages: messages,
          more: messages.length >= options.limit
        };
        
      } catch (error) {
        console.error('消息同步失败:', error);
        return {
          messages: [],
          more: false
        };
      }
    };
  }
  
  // 从服务器同步消息
  private static async syncMessagesFromServer(
    channel: WKChannel, 
    options: SyncOptions
  ): Promise<WKMsg[]> {
    try {
      const httpRequest = http.createHttp();
      const response = await httpRequest.request('https://your-api.com/messages/sync', {
        method: http.RequestMethod.POST,
        header: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${this.getAuthToken()}`,
        },
        extraData: JSON.stringify({
          channel_id: channel.channelId,
          channel_type: channel.channelType,
          start_message_seq: options.startMessageSeq,
          end_message_seq: options.endMessageSeq,
          limit: options.limit,
          pull_mode: options.pullMode,
        })
      });
      
      if (response.responseCode === 200) {
        const data = JSON.parse(response.result as string);
        return this.parseMessages(data.messages);
      } else {
        throw new Error(`同步消息失败: ${response.responseCode}`);
      }
    } catch (error) {
      throw new Error(`网络请求失败: ${error}`);
    }
  }
  
  // 解析消息数据
  private static parseMessages(data: any[]): WKMsg[] {
    return data.map(item => {
      const message = new WKMsg();
      message.messageId = item.message_id;
      message.messageSeq = item.message_seq;
      message.clientMsgNo = item.client_msg_no;
      message.fromUID = item.from_uid;
      message.channelId = item.channel_id;
      message.channelType = item.channel_type;
      message.contentType = item.content_type;
      message.content = item.content;
      message.timestamp = item.timestamp;
      message.status = item.status || 1;
      
      // 解析消息内容
      message.messageContent = this.parseMessageContent(message.contentType, item.content);
      
      return message;
    });
  }
  
  // 解析消息内容
  private static parseMessageContent(contentType: number, content: string): WKMessageContent | null {
    try {
      const contentData = JSON.parse(content);
      
      switch (contentType) {
        case WKMsgContentType.Text:
          return new WKTextContent(contentData.text || '');
          
        case WKMsgContentType.Image:
          const imageContent = new WKImageContent();
          imageContent.url = contentData.url || '';
          imageContent.width = contentData.width || 0;
          imageContent.height = contentData.height || 0;
          return imageContent;
          
        case WKMsgContentType.Voice:
          const voiceContent = new WKVoiceContent();
          voiceContent.url = contentData.url || '';
          voiceContent.duration = contentData.duration || 0;
          return voiceContent;
          
        case WKMsgContentType.Video:
          const videoContent = new WKVideoContent();
          videoContent.url = contentData.url || '';
          videoContent.cover = contentData.cover || '';
          videoContent.duration = contentData.duration || 0;
          videoContent.width = contentData.width || 0;
          videoContent.height = contentData.height || 0;
          return videoContent;
          
        default:
          return null;
      }
    } catch (error) {
      console.error('解析消息内容失败:', error);
      return null;
    }
  }
  
  private static getAuthToken(): string {
    return 'your-auth-token';
  }
}

完整数据源管理器

class WuKongIMDataSourceManager {
  
  static initialize(): void {
    // 设置文件上传提供者
    FileUploadManager.setupUploadProvider();
    
    // 设置会话同步
    ConversationDataSource.setupConversationSync();
    
    // 设置频道数据源
    ChannelDataSource.setupChannelDataSource();
    
    // 设置消息同步
    MessageDataSource.setupMessageSync();
    
    console.log('WuKongIM 数据源管理器初始化完成');
  }
  
  // 设置认证令牌
  static setAuthToken(token: string): void {
    // 保存认证令牌到本地存储
    // 可以使用 HarmonyOS 的首选项API
  }
  
  // 清理所有缓存
  static clearAllCache(): void {
    FileDownloadManager.clearDownloadCache();
    console.log('清理所有缓存完成');
  }
}

// 在应用启动时初始化
export default class EntryAbility extends UIAbility {
  onCreate(want: Want, launchParam: AbilityConstant.LaunchParam): void {
    // 初始化数据源管理器
    WuKongIMDataSourceManager.initialize();
  }
}

下一步