From d80d72b9140e3204d613796e73650c166e3f269a Mon Sep 17 00:00:00 2001 From: "shining.wz" Date: Tue, 9 Sep 2025 13:44:30 +0800 Subject: [PATCH 1/2] add async sql --- aliyun/log/async_sql.proto | 28 +++++ aliyun/log/async_sql_pb2.py | 29 +++++ aliyun/log/async_sql_response.py | 165 +++++++++++++++++++++++++ aliyun/log/delete_async_sql_request.py | 37 ++++++ aliyun/log/get_async_sql_request.py | 75 +++++++++++ aliyun/log/logclient.py | 98 +++++++++++++++ aliyun/log/submit_async_sql_request.py | 153 +++++++++++++++++++++++ 7 files changed, 585 insertions(+) create mode 100644 aliyun/log/async_sql.proto create mode 100644 aliyun/log/async_sql_pb2.py create mode 100644 aliyun/log/async_sql_response.py create mode 100644 aliyun/log/delete_async_sql_request.py create mode 100644 aliyun/log/get_async_sql_request.py create mode 100644 aliyun/log/submit_async_sql_request.py diff --git a/aliyun/log/async_sql.proto b/aliyun/log/async_sql.proto new file mode 100644 index 00000000..619a6999 --- /dev/null +++ b/aliyun/log/async_sql.proto @@ -0,0 +1,28 @@ +syntax = "proto2"; + +message AsyncSqlMetaPB +{ + optional int32 result_rows = 1; + optional int64 processed_rows = 2; + optional int64 processed_bytes = 3; + optional int64 elapsed_milli = 4; + optional double cpu_sec = 5; + optional int64 cpu_cores = 6; + optional string progress = 7; + repeated string keys = 8; // column names +} + +message AsyncSqlRowPB +{ + repeated string columns = 1; +} + +message AsyncSqlResponsePB +{ + required string id = 1; + required string state = 2; + optional AsyncSqlMetaPB meta = 3; + repeated AsyncSqlRowPB rows = 4; + optional string error_code = 5; + optional string error_message = 6; +} \ No newline at end of file diff --git a/aliyun/log/async_sql_pb2.py b/aliyun/log/async_sql_pb2.py new file mode 100644 index 00000000..7e3f87f6 --- /dev/null +++ b/aliyun/log/async_sql_pb2.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: async_sql.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x61sync_sql.proto\"\xb1\x01\n\x0e\x41syncSqlMetaPB\x12\x13\n\x0bresult_rows\x18\x01 \x01(\x05\x12\x16\n\x0eprocessed_rows\x18\x02 \x01(\x03\x12\x17\n\x0fprocessed_bytes\x18\x03 \x01(\x03\x12\x15\n\relapsed_milli\x18\x04 \x01(\x03\x12\x0f\n\x07\x63pu_sec\x18\x05 \x01(\x01\x12\x11\n\tcpu_cores\x18\x06 \x01(\x03\x12\x10\n\x08progress\x18\x07 \x01(\t\x12\x0c\n\x04keys\x18\x08 \x03(\t\" \n\rAsyncSqlRowPB\x12\x0f\n\x07\x63olumns\x18\x01 \x03(\t\"\x97\x01\n\x12\x41syncSqlResponsePB\x12\n\n\x02id\x18\x01 \x02(\t\x12\r\n\x05state\x18\x02 \x02(\t\x12\x1d\n\x04meta\x18\x03 \x01(\x0b\x32\x0f.AsyncSqlMetaPB\x12\x1c\n\x04rows\x18\x04 \x03(\x0b\x32\x0e.AsyncSqlRowPB\x12\x12\n\nerror_code\x18\x05 \x01(\t\x12\x15\n\rerror_message\x18\x06 \x01(\t') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'async_sql_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _ASYNCSQLMETAPB._serialized_start=20 + _ASYNCSQLMETAPB._serialized_end=197 + _ASYNCSQLROWPB._serialized_start=199 + _ASYNCSQLROWPB._serialized_end=231 + _ASYNCSQLRESPONSEPB._serialized_start=234 + _ASYNCSQLRESPONSEPB._serialized_end=385 +# @@protoc_insertion_point(module_scope) diff --git a/aliyun/log/async_sql_response.py b/aliyun/log/async_sql_response.py new file mode 100644 index 00000000..906cda4c --- /dev/null +++ b/aliyun/log/async_sql_response.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +# encoding: utf-8 + +# Copyright (C) Alibaba Cloud Computing +# All rights reserved. + +from .logresponse import LogResponse +from .async_sql_pb2 import AsyncSqlResponsePB +from .compress import Compressor + + +class AsyncSqlResponse(LogResponse): + """ The response used for async SQL operations. + + :type header: dict + :param header: response header + + :type resp: string + :param resp: response data + """ + + def __init__(self, header, resp): + LogResponse.__init__(self, header, resp) + self.async_sql_response_pb = AsyncSqlResponsePB() + if resp: + try: + raw_data = Compressor.decompress_response(header, resp) + self.async_sql_response_pb.ParseFromString(raw_data) + except Exception as ex: + raise Exception("Failed to parse AsyncSqlResponsePB: {0}".format(ex)) + + def get_query_id(self): + """ Get async SQL query ID + + :return: string, query ID + """ + return self.async_sql_response_pb.id + + def get_state(self): + """ Get async SQL query state + + :return: string, query state (e.g., "RUNNING", "COMPLETE", "FAILED") + """ + return self.async_sql_response_pb.state + + def get_error_code(self): + """ Get error code if query failed + + :return: string, error code + """ + return self.async_sql_response_pb.error_code if self.async_sql_response_pb.HasField('error_code') else None + + def get_error_message(self): + """ Get error message if query failed + + :return: string, error message + """ + return self.async_sql_response_pb.error_message if self.async_sql_response_pb.HasField('error_message') else None + + def get_meta(self): + """ Get query metadata + + :return: AsyncSqlMetaPB, query metadata + """ + return self.async_sql_response_pb.meta if self.async_sql_response_pb.HasField('meta') else None + + def get_result_rows(self): + """ Get number of result rows + + :return: int, number of result rows + """ + meta = self.get_meta() + return meta.result_rows if meta and meta.HasField('result_rows') else 0 + + def get_processed_rows(self): + """ Get number of processed rows + + :return: int, number of processed rows + """ + meta = self.get_meta() + return meta.processed_rows if meta and meta.HasField('processed_rows') else 0 + + def get_processed_bytes(self): + """ Get number of processed bytes + + :return: int, number of processed bytes + """ + meta = self.get_meta() + return meta.processed_bytes if meta and meta.HasField('processed_bytes') else 0 + + def get_elapsed_milli(self): + """ Get elapsed time in milliseconds + + :return: int, elapsed time in milliseconds + """ + meta = self.get_meta() + return meta.elapsed_milli if meta and meta.HasField('elapsed_milli') else 0 + + def get_cpu_sec(self): + """ Get CPU time in seconds + + :return: float, CPU time in seconds + """ + meta = self.get_meta() + return meta.cpu_sec if meta and meta.HasField('cpu_sec') else 0.0 + + def get_cpu_cores(self): + """ Get CPU cores used + + :return: int, CPU cores used + """ + meta = self.get_meta() + return meta.cpu_cores if meta and meta.HasField('cpu_cores') else 0 + + def get_progress(self): + """ Get query progress + + :return: string, query progress + """ + meta = self.get_meta() + return meta.progress if meta and meta.HasField('progress') else '' + + def get_keys(self): + """ Get column names/keys + + :return: list, column names + """ + meta = self.get_meta() + return list(meta.keys) if meta else [] + + def get_rows(self): + """ Get result rows + + :return: list, result rows data + """ + rows = [] + for row_pb in self.async_sql_response_pb.rows: + rows.append(list(row_pb.columns)) + return rows + + def get_raw_response_pb(self): + """ Get raw protobuf response + + :return: AsyncSqlResponsePB, raw protobuf response + """ + return self.async_sql_response_pb + + def log_print(self): + """ Print response information for debugging + """ + print("AsyncSqlResponse:") + print(" Query ID: {0}".format(self.get_query_id())) + print(" State: {0}".format(self.get_state())) + print(" Result Rows: {0}".format(self.get_result_rows())) + print(" Processed Rows: {0}".format(self.get_processed_rows())) + print(" Processed Bytes: {0}".format(self.get_processed_bytes())) + print(" Elapsed Time: {0}ms".format(self.get_elapsed_milli())) + print(" CPU Time: {0}s".format(self.get_cpu_sec())) + print(" CPU Cores: {0}".format(self.get_cpu_cores())) + print(" Progress: {0}".format(self.get_progress())) + if self.get_error_code(): + print(" Error Code: {0}".format(self.get_error_code())) + print(" Error Message: {0}".format(self.get_error_message())) + print(" Column Names: {0}".format(self.get_keys())) + print(" Data Rows: {0}".format(len(self.get_rows()))) diff --git a/aliyun/log/delete_async_sql_request.py b/aliyun/log/delete_async_sql_request.py new file mode 100644 index 00000000..06dce222 --- /dev/null +++ b/aliyun/log/delete_async_sql_request.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# encoding: utf-8 + +# Copyright (C) Alibaba Cloud Computing +# All rights reserved. + +from .logrequest import LogRequest + + +class DeleteAsyncSqlRequest(LogRequest): + """ The request used to delete async SQL query. + + :type project: string + :param project: project name + + :type query_id: string + :param query_id: async SQL query ID + """ + + def __init__(self, project=None, query_id=None): + LogRequest.__init__(self, project) + self.query_id = query_id + + def get_query_id(self): + """ Get query ID + + :return: string, query ID + """ + return self.query_id if self.query_id else '' + + def set_query_id(self, query_id): + """ Set query ID + + :type query_id: string + :param query_id: query ID + """ + self.query_id = query_id diff --git a/aliyun/log/get_async_sql_request.py b/aliyun/log/get_async_sql_request.py new file mode 100644 index 00000000..e5c0da61 --- /dev/null +++ b/aliyun/log/get_async_sql_request.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# encoding: utf-8 + +# Copyright (C) Alibaba Cloud Computing +# All rights reserved. + +from .logrequest import LogRequest + + +class GetAsyncSqlRequest(LogRequest): + """ The request used to get async SQL query results. + + :type project: string + :param project: project name + + :type query_id: string + :param query_id: async SQL query ID + + :type offset: int + :param offset: line offset of return logs + + :type line: int + :param line: max line number of return logs + """ + + def __init__(self, project=None, query_id=None, offset=0, line=100): + LogRequest.__init__(self, project) + self.query_id = query_id + self.offset = offset + self.line = line + + def get_query_id(self): + """ Get query ID + + :return: string, query ID + """ + return self.query_id if self.query_id else '' + + def set_query_id(self, query_id): + """ Set query ID + + :type query_id: string + :param query_id: query ID + """ + self.query_id = query_id + + def get_offset(self): + """ Get line offset of return logs + + :return: int, line offset of return logs + """ + return self.offset + + def set_offset(self, offset): + """ Set line offset of return logs + + :type offset: int + :param offset: line offset of return logs + """ + self.offset = offset + + def get_line(self): + """ Get max line number of return logs + + :return: int, max line number of return logs + """ + return self.line + + def set_line(self, line): + """ Set max line number of return logs + + :type line: int + :param line: max line number of return logs + """ + self.line = line diff --git a/aliyun/log/logclient.py b/aliyun/log/logclient.py index e1f3b4f1..ecfd915a 100644 --- a/aliyun/log/logclient.py +++ b/aliyun/log/logclient.py @@ -56,6 +56,7 @@ from .util import base64_encodestring as b64e from .util import base64_encodestring as e64, base64_decodestring as d64, Util from .version import API_VERSION, USER_AGENT +from .async_sql_response import AsyncSqlResponse from .proto import LogGroupRaw as LogGroup from .external_store_config_response import * @@ -962,6 +963,103 @@ def execute_project_sql(self, project, sql, power_sql): request = GetProjectLogsRequest(project, sql, power_sql) return self.get_project_logs(request) + def submit_async_sql(self, request): + """ Submit async SQL query to log service. + Unsuccessful operation will cause an LogException. + + :type request: SubmitAsyncSqlRequest + :param request: the submit async SQL request + + :return: AsyncSqlResponse + + :raise: LogException + """ + project = request.get_project() + + body = { + 'logstore': request.get_logstore(), + 'query': request.get_query(), + 'from': request.get_from(), + 'to': request.get_to() + } + + extensions = { + 'powerSql': request.get_power_sql(), + 'allowIncomplete': request.get_allow_incomplete() + } + + if request.get_max_run_time() is not None: + extensions['maxRunTime'] = request.get_max_run_time() + + body['extensions'] = extensions + + body_str = six.b(json.dumps(body)) + headers = { + 'x-log-bodyrawsize': str(len(body_str)), + 'Content-Type': 'application/json', + 'Accept': 'application/x-protobuf', + 'Accept-Encoding': str(CompressType.default_compress_type()) + } + + params = {} + resource = '/asyncsql' + (resp, header) = self._send('POST', project, body_str, resource, params, headers, "binary") + return AsyncSqlResponse(header, resp) + + def get_async_sql(self, request): + """ Get async SQL query results from log service. + Unsuccessful operation will cause an LogException. + + :type request: GetAsyncSqlRequest + :param request: the get async SQL request + + :return: AsyncSqlResponse + + :raise: LogException + """ + project = request.get_project() + query_id = request.get_query_id() + + params = { + 'offset': request.get_offset(), + 'line': request.get_line() + } + + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/x-protobuf', + 'Accept-Encoding': str(CompressType.default_compress_type()) + } + + resource = '/asyncsql/' + query_id + (resp, header) = self._send('GET', project, {}, resource, params, headers, "binary") + return AsyncSqlResponse(header, resp) + + def delete_async_sql(self, request): + """ Delete async SQL query from log service. + Unsuccessful operation will cause an LogException. + + :type request: DeleteAsyncSqlRequest + :param request: the delete async SQL request + + :return: AsyncSqlResponse + + :raise: LogException + """ + project = request.get_project() + query_id = request.get_query_id() + + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/x-protobuf', + 'Accept-Encoding': str(CompressType.default_compress_type()) + } + + params = {} + resource = '/asyncsql/' + query_id + (resp, header) = self._send('DELETE', project, {}, resource, params, headers, "binary") + return AsyncSqlResponse(header, resp) + def get_context_logs(self, project, logstore, pack_id, pack_meta, back_lines, forward_lines): """ Get context logs of specified log from log service. Unsuccessful operation will cause an LogException. diff --git a/aliyun/log/submit_async_sql_request.py b/aliyun/log/submit_async_sql_request.py new file mode 100644 index 00000000..efb68314 --- /dev/null +++ b/aliyun/log/submit_async_sql_request.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python +# encoding: utf-8 + +# Copyright (C) Alibaba Cloud Computing +# All rights reserved. + +from .logrequest import LogRequest +from .util import parse_timestamp + + +class SubmitAsyncSqlRequest(LogRequest): + """ The request used to submit async SQL query. + + :type project: string + :param project: project name + + :type logstore: string + :param logstore: logstore name + + :type query: string + :param query: SQL query string + + :type fromTime: int/string + :param fromTime: the begin time, or format of time in format "%Y-%m-%d %H:%M:%S" e.g. "2018-01-02 12:12:10" + + :type toTime: int/string + :param toTime: the end time, or format of time in format "%Y-%m-%d %H:%M:%S" e.g. "2018-01-02 12:12:10" + + :type power_sql: bool + :param power_sql: if power_sql is set to true, the query will run on enhanced sql mode + + :type allow_incomplete: bool + :param allow_incomplete: if allow_incomplete is set to true, incomplete results are allowed + + :type max_run_time: int + :param max_run_time: maximum execution time in seconds + """ + + def __init__(self, project=None, logstore=None, query=None, fromTime=None, toTime=None, + power_sql=False, allow_incomplete=True, max_run_time=None): + LogRequest.__init__(self, project) + self.logstore = logstore + self.query = query + self.fromTime = parse_timestamp(fromTime) if fromTime is not None else fromTime + self.toTime = parse_timestamp(toTime) if toTime is not None else toTime + self.power_sql = power_sql + self.allow_incomplete = allow_incomplete + self.max_run_time = max_run_time + + def get_logstore(self): + """ Get logstore name + + :return: string, logstore name + """ + return self.logstore if self.logstore else '' + + def set_logstore(self, logstore): + """ Set logstore name + + :type logstore: string + :param logstore: logstore name + """ + self.logstore = logstore + + def get_query(self): + """ Get SQL query string + + :return: string, SQL query string + """ + return self.query + + def set_query(self, query): + """ Set SQL query string + + :type query: string + :param query: SQL query string + """ + self.query = query + + def get_from(self): + """ Get begin time + + :return: int, begin time + """ + return self.fromTime + + def set_from(self, fromTime): + """ Set begin time + + :type fromTime: int + :param fromTime: begin time + """ + self.fromTime = fromTime + + def get_to(self): + """ Get end time + + :return: int, end time + """ + return self.toTime + + def set_to(self, toTime): + """ Set end time + + :type toTime: int + :param toTime: end time + """ + self.toTime = toTime + + def get_power_sql(self): + """ Get power_sql flag + + :return: bool, power_sql flag + """ + return self.power_sql + + def set_power_sql(self, power_sql): + """ Set power_sql flag + + :type power_sql: bool + :param power_sql: power_sql flag + """ + self.power_sql = power_sql + + def get_allow_incomplete(self): + """ Get allow_incomplete flag + + :return: bool, allow_incomplete flag + """ + return self.allow_incomplete + + def set_allow_incomplete(self, allow_incomplete): + """ Set allow_incomplete flag + + :type allow_incomplete: bool + :param allow_incomplete: allow_incomplete flag + """ + self.allow_incomplete = allow_incomplete + + def get_max_run_time(self): + """ Get max_run_time + + :return: int, max_run_time + """ + return self.max_run_time + + def set_max_run_time(self, max_run_time): + """ Set max_run_time + + :type max_run_time: int + :param max_run_time: maximum execution time in seconds + """ + self.max_run_time = max_run_time From 2cd511d61d06c99df6f46d4b5fbd997ae75d480d Mon Sep 17 00:00:00 2001 From: "shining.wz" Date: Tue, 9 Sep 2025 14:06:29 +0800 Subject: [PATCH 2/2] only import AsyncSqlResponse when python3 --- aliyun/log/logclient.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aliyun/log/logclient.py b/aliyun/log/logclient.py index ecfd915a..faccd118 100644 --- a/aliyun/log/logclient.py +++ b/aliyun/log/logclient.py @@ -56,7 +56,8 @@ from .util import base64_encodestring as b64e from .util import base64_encodestring as e64, base64_decodestring as d64, Util from .version import API_VERSION, USER_AGENT -from .async_sql_response import AsyncSqlResponse +if six.PY3: + from .async_sql_response import AsyncSqlResponse from .proto import LogGroupRaw as LogGroup from .external_store_config_response import *