Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 78 additions & 34 deletions cassandra/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2577,6 +2577,10 @@ class SchemaParserV3(SchemaParserV22):
_SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates"
_SELECT_VIEWS = "SELECT * FROM system_schema.views"

def _is_not_scylla(self):
"""Check if NOT connected to ScyllaDB by checking for shard awareness."""
return getattr(getattr(self.connection, 'features', None), 'shard_id', None) is None

_table_name_col = 'table_name'

_function_agg_arument_type_col = 'argument_types'
Expand Down Expand Up @@ -2627,27 +2631,44 @@ def get_table(self, keyspaces, keyspace, table):
indexes_query = QueryMessage(
query=maybe_add_timeout_to_query(self._SELECT_INDEXES + where_clause, self.metadata_request_timeout),
consistency_level=cl, fetch_size=fetch_size)
triggers_query = QueryMessage(
query=maybe_add_timeout_to_query(self._SELECT_TRIGGERS + where_clause, self.metadata_request_timeout),
consistency_level=cl, fetch_size=fetch_size)

# ScyllaDB doesn't have triggers, skip the query
if self._is_not_scylla():
triggers_query = QueryMessage(
query=maybe_add_timeout_to_query(self._SELECT_TRIGGERS + where_clause, self.metadata_request_timeout),
consistency_level=cl, fetch_size=fetch_size)

# in protocol v4 we don't know if this event is a view or a table, so we look for both
where_clause = bind_params(" WHERE keyspace_name = %s AND view_name = %s", (keyspace, table), _encoder)
view_query = QueryMessage(
query=maybe_add_timeout_to_query(self._SELECT_VIEWS + where_clause, self.metadata_request_timeout),
consistency_level=cl, fetch_size=fetch_size)
((cf_success, cf_result), (col_success, col_result),
(indexes_sucess, indexes_result), (triggers_success, triggers_result),
(view_success, view_result)) = (
self.connection.wait_for_responses(
cf_query, col_query, indexes_query, triggers_query,
view_query, timeout=self.timeout, fail_on_error=False)
)

if self._is_not_scylla():
((cf_success, cf_result), (col_success, col_result),
(indexes_sucess, indexes_result), (triggers_success, triggers_result),
(view_success, view_result)) = (
self.connection.wait_for_responses(
cf_query, col_query, indexes_query, triggers_query,
view_query, timeout=self.timeout, fail_on_error=False)
)
else:
((cf_success, cf_result), (col_success, col_result),
(indexes_sucess, indexes_result),
(view_success, view_result)) = (
self.connection.wait_for_responses(
cf_query, col_query, indexes_query,
view_query, timeout=self.timeout, fail_on_error=False)
)

table_result = self._handle_results(cf_success, cf_result, query_msg=cf_query)
col_result = self._handle_results(col_success, col_result, query_msg=col_query)
if table_result:
indexes_result = self._handle_results(indexes_sucess, indexes_result, query_msg=indexes_query)
triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=triggers_query)
if self._is_not_scylla():
triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=triggers_query)
else:
triggers_result = None
return self._build_table_metadata(table_result[0], col_result, triggers_result, indexes_result)

view_result = self._handle_results(view_success, view_result, query_msg=view_query)
Expand Down Expand Up @@ -2696,9 +2717,10 @@ def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_row

self._build_table_columns(table_meta, col_rows, compact_static, is_dense, virtual)

for trigger_row in trigger_rows:
trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
table_meta.triggers[trigger_meta.name] = trigger_meta
if self._is_not_scylla():
for trigger_row in trigger_rows:
trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
table_meta.triggers[trigger_meta.name] = trigger_meta

for index_row in index_rows:
index_meta = self._build_index_metadata(table_meta, index_row)
Expand Down Expand Up @@ -2793,6 +2815,7 @@ def _build_trigger_metadata(table_metadata, row):
trigger_meta = TriggerMetadata(table_metadata, name, options)
return trigger_meta


def _query_all(self):
cl = ConsistencyLevel.ONE
fetch_size = self.fetch_size
Expand All @@ -2809,35 +2832,45 @@ def _query_all(self):
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_AGGREGATES, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_TRIGGERS, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_INDEXES, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_VIEWS, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl),
]

# ScyllaDB doesn't have triggers, skip the query
if self._is_not_scylla():
queries.append(QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_TRIGGERS, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl))

responses = self.connection.wait_for_responses(*queries, timeout=self.timeout, fail_on_error=False)

# Unpack common responses (always present)
((ks_success, ks_result),
(table_success, table_result),
(col_success, col_result),
(types_success, types_result),
(functions_success, functions_result),
(aggregates_success, aggregates_result),
(triggers_success, triggers_result),
(indexes_success, indexes_result),
(views_success, views_result)) = self.connection.wait_for_responses(
*queries, timeout=self.timeout, fail_on_error=False
)
(views_success, views_result)) = responses[:8]

# Unpack triggers response if present (Cassandra/DSE only)
if self._is_not_scylla():
(triggers_success, triggers_result) = responses[8]

self.keyspaces_result = self._handle_results(ks_success, ks_result, query_msg=queries[0])
self.tables_result = self._handle_results(table_success, table_result, query_msg=queries[1])
self.columns_result = self._handle_results(col_success, col_result, query_msg=queries[2])
self.triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=queries[6])
self.types_result = self._handle_results(types_success, types_result, query_msg=queries[3])
self.functions_result = self._handle_results(functions_success, functions_result, query_msg=queries[4])
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result, query_msg=queries[5])
self.indexes_result = self._handle_results(indexes_success, indexes_result, query_msg=queries[7])
self.views_result = self._handle_results(views_success, views_result, query_msg=queries[8])
self.indexes_result = self._handle_results(indexes_success, indexes_result, query_msg=queries[6])
self.views_result = self._handle_results(views_success, views_result, query_msg=queries[7])
if self._is_not_scylla():
self.triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=queries[8])
else:
self.triggers_result = []

self._aggregate_results()

Expand Down Expand Up @@ -2915,8 +2948,6 @@ def _query_all(self):
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_AGGREGATES, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_TRIGGERS, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_INDEXES, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl),
QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_VIEWS, self.metadata_request_timeout),
Expand All @@ -2930,8 +2961,15 @@ def _query_all(self):
fetch_size=fetch_size, consistency_level=cl),
]

# ScyllaDB doesn't have triggers, skip the query
if self._is_not_scylla():
queries.append(QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_TRIGGERS, self.metadata_request_timeout),
fetch_size=fetch_size, consistency_level=cl))

responses = self.connection.wait_for_responses(
*queries, timeout=self.timeout, fail_on_error=False)

# Unpack common responses (always present)
(
# copied from V3
(ks_success, ks_result),
Expand All @@ -2940,39 +2978,45 @@ def _query_all(self):
(types_success, types_result),
(functions_success, functions_result),
(aggregates_success, aggregates_result),
(triggers_success, triggers_result),
(indexes_success, indexes_result),
(views_success, views_result),
# V4-only responses
(virtual_ks_success, virtual_ks_result),
(virtual_table_success, virtual_table_result),
(virtual_column_success, virtual_column_result)
) = responses
(virtual_column_success, virtual_column_result),
) = responses[:11]

# Unpack triggers response if present (Cassandra/DSE only)
if self._is_not_scylla():
(triggers_success, triggers_result) = responses[11]

# copied from V3
self.keyspaces_result = self._handle_results(ks_success, ks_result, query_msg=queries[0])
self.tables_result = self._handle_results(table_success, table_result, query_msg=queries[1])
self.columns_result = self._handle_results(col_success, col_result, query_msg=queries[2])
self.triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=queries[6])
self.types_result = self._handle_results(types_success, types_result, query_msg=queries[3])
self.functions_result = self._handle_results(functions_success, functions_result, query_msg=queries[4])
self.aggregates_result = self._handle_results(aggregates_success, aggregates_result, query_msg=queries[5])
self.indexes_result = self._handle_results(indexes_success, indexes_result, query_msg=queries[7])
self.views_result = self._handle_results(views_success, views_result, query_msg=queries[8])
self.indexes_result = self._handle_results(indexes_success, indexes_result, query_msg=queries[6])
self.views_result = self._handle_results(views_success, views_result, query_msg=queries[7])
if self._is_not_scylla():
self.triggers_result = self._handle_results(triggers_success, triggers_result, query_msg=queries[11])
else:
self.triggers_result = []
# V4-only results
# These tables don't exist in some DSE versions reporting 4.X so we can
# ignore them if we got an error
self.virtual_keyspaces_result = self._handle_results(
virtual_ks_success, virtual_ks_result,
expected_failures=(InvalidRequest,), query_msg=queries[9]
expected_failures=(InvalidRequest,), query_msg=queries[8]
)
self.virtual_tables_result = self._handle_results(
virtual_table_success, virtual_table_result,
expected_failures=(InvalidRequest,), query_msg=queries[10]
expected_failures=(InvalidRequest,), query_msg=queries[9]
)
self.virtual_columns_result = self._handle_results(
virtual_column_success, virtual_column_result,
expected_failures=(InvalidRequest,), query_msg=queries[11]
expected_failures=(InvalidRequest,), query_msg=queries[10]
)

self._aggregate_results()
Expand Down
Loading