package cn.huge.module.ai.service; import cn.huge.base.common.exception.ServiceException; import cn.huge.base.common.utils.GuavaCacheUtils; import cn.huge.base.common.utils.ObjectUtils; import cn.huge.base.common.utils.ReturnSucUtils; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import java.io.IOException; import java.util.HashMap; import java.util.Map; @Slf4j @Service @Transactional(rollbackFor = Exception.class) public class AiDifyService { @Value("${dify.url}") private String difyUrl; @Value("${dify.getMediateStrategyApi}") private String getMediateStrategyApi; @Value("${dify.getMediateCaseDialogApi}") private String getChatApi; @Value("${dify.getPropertyDialogApi}") private String getPropertyDialogApi; @Value("${dify.user}") private String user; /** * 发送 http post 请求,参数以原生字符串进行提交,因为ai请求时间过长,在此另外实现一个请求方法 * @param url 请求地址 * @param stringJson json字符串数据 * @param headers 请求头 * @param encode 编码 * @return * @throws Exception */ public static String httpPostRaw(String url, String stringJson, Map headers, String encode) throws Exception{ if(encode == null){ encode = "utf-8"; } CloseableHttpClient closeableHttpClient = HttpClients.createDefault(); //设置请求和传输超时时间,这里将时间设置为五分钟 RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(300000).setConnectTimeout(300000).build(); HttpPost httpost = new HttpPost(url); httpost.setConfig(requestConfig); //设置header httpost.setHeader("Content-type", "application/json"); if (ObjectUtils.isNotEmpty(headers)) { for (Map.Entry entry : headers.entrySet()) { httpost.setHeader(entry.getKey(),entry.getValue()); } } //组织请求参数 StringEntity stringEntity = new StringEntity(stringJson, encode); httpost.setEntity(stringEntity); String content = null; CloseableHttpResponse httpResponse = null; try { //响应信息 httpResponse = closeableHttpClient.execute(httpost); HttpEntity entity = httpResponse.getEntity(); content = EntityUtils.toString(entity, encode); } catch (Exception e) { e.printStackTrace(); throw new Exception("网络连接异常,请稍候重试!", e); }finally{ try { if (httpResponse != null) { httpResponse.close(); } } catch (IOException e) { e.printStackTrace(); } } try { //关闭连接、释放资源 closeableHttpClient.close(); } catch (IOException e) { e.printStackTrace(); throw new Exception("网络连接异常,请稍候重试!", e); } return content; } /** * 获取案件调解策略 * 处理流程: * 1. 优先从缓存获取调解策略 * 2. 如无缓存记录,则请求AI服务生成调解策略 * 3. 将生成的调解策略保存到缓存 * * @param caseText 案件描述文本 * @return ReturnSucUtils封装的调解策略结果 */ public Object getMediateStrategy(String caseText) { String cacheKey = "getMediateStrategy" + caseText.hashCode(); try { // 先从缓存获取 Object cacheAi = GuavaCacheUtils.getCacheAi(cacheKey); if (ObjectUtils.isNotEmpty(cacheAi)) { return ReturnSucUtils.getRepInfo("请求成功", cacheAi); } // 参数校验 if (!validateRequest(caseText)) { return ReturnSucUtils.getRepInfo("参数错误", null); } // 请求AI服务并处理结果 JSONObject responseObj = requestAiMediateStrategy(caseText); if (responseObj != null) { try { // 从响应中正确提取调解策略 String mediateStrategy = responseObj.getJSONObject("data") .getJSONObject("outputs") .getString("text"); if (mediateStrategy != null) { // 清理调解策略中的指定字符 if (mediateStrategy.contains("输出格式") || mediateStrategy.contains("```json{") || mediateStrategy.contains("}```") || mediateStrategy.contains("\"content\":\"")) { mediateStrategy = mediateStrategy.replace("输出格式", "").replace("```json{", "") .replace("}```", "") .replace("\"content\":\"", ""); } // 将调解策略保存到缓存 GuavaCacheUtils.putCacheAi(cacheKey, mediateStrategy); return ReturnSucUtils.getRepInfo("请求成功", mediateStrategy); } } catch (Exception e) { log.error("解析AI调解策略响应失败: {}", e.getMessage(), e); log.error("返回的JSON格式: {}", responseObj); } } return ReturnSucUtils.getRepInfo("调解策略生成失败", null); } catch (Exception e) { log.error("获取案件调解策略失败: {}", e.getMessage(), e); return ReturnSucUtils.getRepInfo("调解策略生成异常", null); } } /** * 验证请求参数 */ private boolean validateRequest(String caseText) { if (caseText == null || caseText.trim().isEmpty()) { log.warn("输入参数为空,caseText: {}", caseText); return false; } return true; } /** * 请求AI调解策略服务 * @param caseText 案件描述文本 * @return AI服务响应结果 */ private JSONObject requestAiMediateStrategy(String caseText) { try { // 构建请求参数 Map requestBody = new HashMap<>(); Map inputs = new HashMap<>(); inputs.put("caseText", caseText); requestBody.put("inputs", inputs); requestBody.put("response_mode", "blocking"); requestBody.put("user", user); // 设置请求头 Map headers = new HashMap<>(); headers.put("Authorization", "Bearer " + getMediateStrategyApi); // 发送请求到Dify服务 String response = httpPostRaw(difyUrl + "/workflows/run", JSONObject.toJSONString(requestBody), headers, "utf-8"); return JSONObject.parseObject(response); } catch (Exception e) { log.error("请求AI调解策略服务失败: {}", e.getMessage(), e); return null; } } /** * 提供流式对话功能 * 该方法通过WebClient向Dify服务发送流式对话请求,并以流式方式返回AI的回答 * * @param caseText 案件详情文本 * @param query 用户的提问 * @param conversationId 对话ID,第一次对话时为空,多轮对话时需要传入 * @param responseMode 响应模式,可选值为"streaming"或"blocking" * @return StreamingResponseBody 流式响应对象 */ public StreamingResponseBody streamChat(String caseText, String query, String conversationId, String responseMode) { try { // 验证responseMode参数 if (responseMode == null || responseMode.isEmpty()) { responseMode = "streaming"; // 默认使用streaming模式 } else if (!responseMode.equals("streaming") && !responseMode.equals("blocking")) { log.warn("不支持的响应模式: {}, 将使用默认的streaming模式", responseMode); responseMode = "streaming"; } // 构建请求体 Map requestBody = new HashMap<>(); Map inputs = new HashMap<>(); inputs.put("case_text", caseText); requestBody.put("inputs", inputs); requestBody.put("query", query); requestBody.put("response_mode", responseMode); requestBody.put("user", user); // 如果存在对话ID,则添加到请求中 if (conversationId != null && !conversationId.isEmpty()) { requestBody.put("conversation_id", conversationId); } // 创建WebClient,用于发送HTTP请求 WebClient client = WebClient.builder() .baseUrl(difyUrl + "/chat-messages") .defaultHeader("Content-Type", "application/json") .defaultHeader("Authorization", "Bearer " + getChatApi) .build(); // 返回一个StreamingResponseBody对象,用于处理流式响应 return outputStream -> client.post() .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromValue(requestBody)) .retrieve() .bodyToFlux(DataBuffer.class) .doOnNext(dataBuffer -> { // 将响应数据缓冲区转换为字节数组并写入输出流 byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); try { outputStream.write(bytes); // 刷新输出流以确保数据立即发送 outputStream.flush(); } catch (IOException e) { log.error("流式写入数据失败: {}", e.getMessage(), e); } }) .doOnError(e -> { log.error("流式对话请求失败: {}", e.getMessage(), e); }) .blockLast(); } catch (Exception e) { log.error("AI流式对话服务异常: {}", e.getMessage(), e); throw new ServiceException("AiDifyService.streamChat", e); } } /** * 提供阻塞式对话功能,直接返回完整回答 * 该方法通过HTTP请求向Dify服务发送阻塞式对话请求,并返回完整的AI回答 * * @param caseText 案件详情文本 * @param query 用户的提问 * @param conversationId 对话ID,第一次对话时为空,多轮对话时需要传入 * @return 包含AI回答和对话ID的Map对象 */ public Map blockingChat(String caseText, String query, String conversationId) { try { // 构建请求体 Map requestBody = new HashMap<>(); Map inputs = new HashMap<>(); inputs.put("case_text", caseText); requestBody.put("inputs", inputs); requestBody.put("query", query); requestBody.put("response_mode", "blocking"); requestBody.put("user", user); // 如果存在对话ID,则添加到请求中 if (conversationId != null && !conversationId.isEmpty()) { requestBody.put("conversation_id", conversationId); } // 设置请求头 Map headers = new HashMap<>(); headers.put("Authorization", "Bearer " + getChatApi); // 发送请求到Dify服务 String response = httpPostRaw(difyUrl + "/chat-messages", JSONObject.toJSONString(requestBody), headers, "utf-8"); // 解析响应 JSONObject responseObj = JSONObject.parseObject(response); // 提取关键信息 Map result = new HashMap<>(); // 提取对话ID if (responseObj.containsKey("conversation_id")) { result.put("conversationId", responseObj.getString("conversation_id")); } // 提取回答内容 if (responseObj.containsKey("answer")) { result.put("answer", responseObj.getString("answer")); } else if (responseObj.containsKey("data") && responseObj.getJSONObject("data").containsKey("answer")) { result.put("answer", responseObj.getJSONObject("data").getString("answer")); } return result; } catch (Exception e) { log.error("AI阻塞式对话服务异常: {}", e.getMessage(), e); throw new ServiceException("AiDifyService.blockingChat", e); } } /** * 提供阻塞式财产对话功能,直接返回完整回答 * 该方法通过HTTP请求向Dify服务发送阻塞式对话请求,并返回完整的AI回答 * * @param caseText 案件详情文本 * @param query 用户的提问 * @param conversationId 对话ID,第一次对话时为空,多轮对话时需要传入 * @param checkRationality 可选参数,开启AI服务内部的请求合理性检查,默认不传 * @return 包含AI回答和对话ID的Map对象 */ public Map blockingPropertyChat(String query, String conversationId, String checkRationality) { try { // 构建请求体 Map requestBody = new HashMap<>(); Map inputs = new HashMap<>(); // 如果传入checkRationality且不为空,则添加到请求体 if (checkRationality != null && !checkRationality.isEmpty()) { inputs.put("checkRationality", checkRationality); } requestBody.put("inputs", inputs); requestBody.put("query", query); requestBody.put("response_mode", "blocking"); requestBody.put("user", user); // 如果存在对话ID,则添加到请求中 if (conversationId != null && !conversationId.isEmpty()) { requestBody.put("conversation_id", conversationId); } // 设置请求头 Map headers = new HashMap<>(); headers.put("Authorization", "Bearer " + getPropertyDialogApi); // 发送请求到Dify服务 String response = httpPostRaw(difyUrl + "/chat-messages", JSONObject.toJSONString(requestBody), headers, "utf-8"); // 解析响应 JSONObject responseObj = JSONObject.parseObject(response); // 提取关键信息 Map result = new HashMap<>(); // 提取对话ID if (responseObj.containsKey("conversation_id")) { result.put("conversationId", responseObj.getString("conversation_id")); } // 提取回答内容 if (responseObj.containsKey("answer")) { result.put("answer", responseObj.getString("answer")); } else if (responseObj.containsKey("data") && responseObj.getJSONObject("data").containsKey("answer")) { result.put("answer", responseObj.getJSONObject("data").getString("answer")); } return result; } catch (Exception e) { log.error("AI阻塞式财产对话服务异常: {}", e.getMessage(), e); throw new ServiceException("AiDifyService.blockingPropertyChat", e); } } }